summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-06 14:00:28 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-06 14:00:28 -0700
commit331442ee0fcc0d888c2b2d2ed4f2a339d167b4a2 (patch)
tree32734b7dedfe5d4819f504e53dc5fd5068b9a9c2
parent4c965133df2cfcae2e7a6b7fdcea9ce28a21f9ac (diff)
parent78ad43600c469c05a5b0e32c6be27048749cd58e (diff)
downloadkafka-python-331442ee0fcc0d888c2b2d2ed4f2a339d167b4a2.tar.gz
Merge pull request #634 from dpkp/fetch
Small improvements to fetching logic
-rw-r--r--kafka/consumer/fetcher.py50
1 files changed, 32 insertions, 18 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 2883bd8..8ce573b 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -338,6 +338,8 @@ class Fetcher(six.Iterator):
for record in self._unpack_message_set(tp, messages):
# Fetched compressed messages may include additional records
if record.offset < fetch_offset:
+ log.debug("Skipping message offset: %s (expecting %s)",
+ record.offset, fetch_offset)
continue
drained[tp].append(record)
else:
@@ -349,22 +351,22 @@ class Fetcher(six.Iterator):
return dict(drained)
def _unpack_message_set(self, tp, messages):
- for offset, size, msg in messages:
- if self.config['check_crcs'] and not msg.validate_crc():
- raise Errors.InvalidMessageError(msg)
- elif msg.is_compressed():
- for record in self._unpack_message_set(tp, msg.decompress()):
- yield record
- else:
- try:
+ try:
+ for offset, size, msg in messages:
+ if self.config['check_crcs'] and not msg.validate_crc():
+ raise Errors.InvalidMessageError(msg)
+ elif msg.is_compressed():
+ for record in self._unpack_message_set(tp, msg.decompress()):
+ yield record
+ else:
key, value = self._deserialize(msg)
- # If the deserializer raises StopIteration, it is erroneously
- # caught by the generator. We want all exceptions to be raised
- # back to the user. See Issue 545
- except StopIteration as e:
- log.exception('Deserializer raised StopIteration: %s', e)
- raise Exception('Deserializer raised StopIteration')
- yield ConsumerRecord(tp.topic, tp.partition, offset, key, value)
+ yield ConsumerRecord(tp.topic, tp.partition, offset, key, value)
+ # If unpacking raises StopIteration, it is erroneously
+ # caught by the generator. We want all exceptions to be raised
+ # back to the user. See Issue 545
+ except StopIteration as e:
+ log.exception('StopIteration raised unpacking messageset: %s', e)
+ raise Exception('StopIteration raised unpacking messageset')
def _message_generator(self):
"""Iterate over fetched_records"""
@@ -419,6 +421,9 @@ class Fetcher(six.Iterator):
# Compressed messagesets may include earlier messages
# It is also possible that the user called seek()
elif msg.offset != self._subscriptions.assignment[tp].position:
+ log.debug("Skipping message offset: %s (expecting %s)",
+ msg.offset,
+ self._subscriptions.assignment[tp].position)
continue
self._subscriptions.assignment[tp].position = msg.offset + 1
@@ -532,15 +537,24 @@ class Fetcher(six.Iterator):
# which can be passed to FetchRequest() via .items()
fetchable = collections.defaultdict(lambda: collections.defaultdict(list))
+ # avoid re-fetching pending offsets
+ pending = set()
+ for fetch_offset, tp, _ in self._records:
+ pending.add((tp, fetch_offset))
+
for partition in self._subscriptions.fetchable_partitions():
node_id = self._client.cluster.leader_for_partition(partition)
+ position = self._subscriptions.assignment[partition].position
+
+ # fetch if there is a leader, no in-flight requests, and no _records
if node_id is None or node_id == -1:
log.debug("No leader found for partition %s."
" Requesting metadata update", partition)
self._client.cluster.request_update()
- elif self._client.in_flight_request_count(node_id) == 0:
- # fetch if there is a leader and no in-flight requests
- position = self._subscriptions.assignment[partition].position
+
+ elif ((partition, position) not in pending and
+ self._client.in_flight_request_count(node_id) == 0):
+
partition_info = (
partition.partition,
position,