From 358b4820744c42d47171f17a5b373a1c89f520bb Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 6 Apr 2016 11:46:20 -0700 Subject: Log debug messages when skipping fetched messages due to offset checks --- kafka/consumer/fetcher.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'kafka/consumer') diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index c1f98eb..375090a 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -338,6 +338,8 @@ class Fetcher(six.Iterator): for record in self._unpack_message_set(tp, messages): # Fetched compressed messages may include additional records if record.offset < fetch_offset: + log.debug("Skipping message offset: %s (expecting %s)", + record.offset, fetch_offset) continue drained[tp].append(record) else: @@ -419,6 +421,9 @@ class Fetcher(six.Iterator): # Compressed messagesets may include earlier messages # It is also possible that the user called seek() elif msg.offset != self._subscriptions.assignment[tp].position: + log.debug("Skipping message offset: %s (expecting %s)", + msg.offset, + self._subscriptions.assignment[tp].position) continue self._subscriptions.assignment[tp].position = msg.offset + 1 -- cgit v1.2.1 From 3ef15f9d60af01ce397737b4d356618385b8884f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 6 Apr 2016 11:47:07 -0700 Subject: Increase coverage of StopIteration check in _unpack_message_set --- kafka/consumer/fetcher.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) (limited to 'kafka/consumer') diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 375090a..71d2ed2 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -351,22 +351,22 @@ class Fetcher(six.Iterator): return dict(drained) def _unpack_message_set(self, tp, messages): - for offset, size, msg in messages: - if self.config['check_crcs'] and not msg.validate_crc(): - raise Errors.InvalidMessageError(msg) - elif msg.is_compressed(): - for record in self._unpack_message_set(tp, msg.decompress()): - yield record - else: - try: + try: + for offset, size, msg in messages: + if self.config['check_crcs'] and not msg.validate_crc(): + raise Errors.InvalidMessageError(msg) + elif msg.is_compressed(): + for record in self._unpack_message_set(tp, msg.decompress()): + yield record + else: key, value = self._deserialize(msg) - # If the deserializer raises StopIteration, it is erroneously - # caught by the generator. We want all exceptions to be raised - # back to the user. See Issue 545 - except StopIteration as e: - log.exception('Deserializer raised StopIteration: %s', e) - raise Exception('Deserializer raised StopIteration') - yield ConsumerRecord(tp.topic, tp.partition, offset, key, value) + yield ConsumerRecord(tp.topic, tp.partition, offset, key, value) + # If unpacking raises StopIteration, it is erroneously + # caught by the generator. We want all exceptions to be raised + # back to the user. See Issue 545 + except StopIteration as e: + log.exception('StopIteration raised unpacking messageset: %s', e) + raise Exception('StopIteration raised unpacking messageset') def _message_generator(self): """Iterate over fetched_records""" -- cgit v1.2.1 From 78ad43600c469c05a5b0e32c6be27048749cd58e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 6 Apr 2016 11:47:41 -0700 Subject: Dont send FetchRequest for (obviously) pending data --- kafka/consumer/fetcher.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) (limited to 'kafka/consumer') diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 71d2ed2..4769c2e 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -537,15 +537,24 @@ class Fetcher(six.Iterator): # which can be passed to FetchRequest() via .items() fetchable = collections.defaultdict(lambda: collections.defaultdict(list)) + # avoid re-fetching pending offsets + pending = set() + for fetch_offset, tp, _ in self._records: + pending.add((tp, fetch_offset)) + for partition in self._subscriptions.fetchable_partitions(): node_id = self._client.cluster.leader_for_partition(partition) + position = self._subscriptions.assignment[partition].position + + # fetch if there is a leader, no in-flight requests, and no _records if node_id is None or node_id == -1: log.debug("No leader found for partition %s." " Requesting metadata update", partition) self._client.cluster.request_update() - elif self._client.in_flight_request_count(node_id) == 0: - # fetch if there is a leader and no in-flight requests - position = self._subscriptions.assignment[partition].position + + elif ((partition, position) not in pending and + self._client.in_flight_request_count(node_id) == 0): + partition_info = ( partition.partition, position, -- cgit v1.2.1