summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/fetcher.py9
-rw-r--r--test/test_fetcher.py23
2 files changed, 2 insertions, 30 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 661df63..e4d76cf 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -838,17 +838,12 @@ class Fetcher(six.Iterator):
return parsed_records
- class PartitionRecords(object):
+ class PartitionRecords(six.Iterator):
def __init__(self, fetch_offset, tp, messages):
self.fetch_offset = fetch_offset
self.topic_partition = tp
self.messages = messages
- # When fetching an offset that is in the middle of a
- # compressed batch, we will get all messages in the batch.
- # But we want to start 'take' at the fetch_offset
- for i, msg in enumerate(messages):
- if msg.offset == fetch_offset:
- self.message_idx = i
+ self.message_idx = 0
# For truthiness evaluation we need to define __len__ or __nonzero__
def __len__(self):
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index ef3f686..429071a 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -303,29 +303,6 @@ def test__handle_offset_response(fetcher, mocker):
assert isinstance(fut.exception, NotLeaderForPartitionError)
-def test_partition_records_offset():
- """Test that compressed messagesets are handled correctly
- when fetch offset is in the middle of the message list
- """
- batch_start = 120
- batch_end = 130
- fetch_offset = 123
- tp = TopicPartition('foo', 0)
- messages = [ConsumerRecord(tp.topic, tp.partition, i,
- None, None, 'key', 'value', 'checksum', 0, 0)
- for i in range(batch_start, batch_end)]
- records = Fetcher.PartitionRecords(fetch_offset, None, messages)
- assert len(records) > 0
- msgs = records.take(1)
- assert msgs[0].offset == 123
- assert records.fetch_offset == 124
- msgs = records.take(2)
- assert len(msgs) == 2
- assert len(records) > 0
- records.discard()
- assert len(records) == 0
-
-
def test_fetched_records(fetcher, topic, mocker):
fetcher.config['check_crcs'] = False
tp = TopicPartition(topic, 0)