summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/group.py20
1 files changed, 13 insertions, 7 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 7b59567..77b0b96 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -303,7 +303,7 @@ class KafkaConsumer(six.Iterator):
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
- 'legacy_iterator': True, # experimental feature
+ 'legacy_iterator': False, # enable to revert to < 1.4.7 iterator
}
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000
@@ -598,7 +598,7 @@ class KafkaConsumer(six.Iterator):
partitions = cluster.partitions_for_topic(topic)
return partitions
- def poll(self, timeout_ms=0, max_records=None):
+ def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
"""Fetch data from assigned topics / partitions.
Records are fetched and returned in batches by topic-partition.
@@ -622,6 +622,12 @@ class KafkaConsumer(six.Iterator):
dict: Topic to list of records since the last fetch for the
subscribed list of topics and partitions.
"""
+ # Note: update_offsets is an internal-use only argument. It is used to
+ # support the python iterator interface, and which wraps consumer.poll()
+ # and requires that the partition offsets tracked by the fetcher are not
+ # updated until the iterator returns each record to the user. As such,
+ # the argument is not documented and should not be relied on by library
+ # users to not break in the future.
assert timeout_ms >= 0, 'Timeout must not be negative'
if max_records is None:
max_records = self.config['max_poll_records']
@@ -632,7 +638,7 @@ class KafkaConsumer(six.Iterator):
start = time.time()
remaining = timeout_ms
while True:
- records = self._poll_once(remaining, max_records)
+ records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
if records:
return records
@@ -642,7 +648,7 @@ class KafkaConsumer(six.Iterator):
if remaining <= 0:
return {}
- def _poll_once(self, timeout_ms, max_records):
+ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
"""Do one round of polling. In addition to checking for new data, this does
any needed heart-beating, auto-commits, and offset updates.
@@ -661,7 +667,7 @@ class KafkaConsumer(six.Iterator):
# If data is available already, e.g. from a previous network client
# poll() call to commit, then just return it immediately
- records, partial = self._fetcher.fetched_records(max_records, update_offsets=bool(self._iterator))
+ records, partial = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
if records:
# Before returning the fetched records, we can send off the
# next round of fetches and avoid block waiting for their
@@ -681,7 +687,7 @@ class KafkaConsumer(six.Iterator):
if self._coordinator.need_rejoin():
return {}
- records, _ = self._fetcher.fetched_records(max_records, update_offsets=bool(self._iterator))
+ records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
return records
def position(self, partition):
@@ -1089,7 +1095,7 @@ class KafkaConsumer(six.Iterator):
def _message_generator_v2(self):
timeout_ms = 1000 * (self._consumer_timeout - time.time())
- record_map = self.poll(timeout_ms=timeout_ms)
+ record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
for tp, records in six.iteritems(record_map):
# Generators are stateful, and it is possible that the tp / records
# here may become stale during iteration -- i.e., we seek to a