summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-11-16 23:16:26 -0800
committerDana Powers <dana.powers@gmail.com>2017-11-16 23:17:55 -0800
commit16e05e7f8b4553343b47462595b9743f1f8ab900 (patch)
treebc5902bc44a17fae45f8b8017abc146af2d2a3dc
parent7bde919f2732e34cd76c858c79d965db528a0096 (diff)
downloadkafka-python-16e05e7f8b4553343b47462595b9743f1f8ab900.tar.gz
Revert ffc7caef13a120f69788bcdd43ffa01468f575f9 / PR #1239
The change caused a regression documented in issue #1290
-rw-r--r--kafka/consumer/fetcher.py9
-rw-r--r--test/test_fetcher.py23
2 files changed, 2 insertions, 30 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 661df63..e4d76cf 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -838,17 +838,12 @@ class Fetcher(six.Iterator):
return parsed_records
- class PartitionRecords(object):
+ class PartitionRecords(six.Iterator):
def __init__(self, fetch_offset, tp, messages):
self.fetch_offset = fetch_offset
self.topic_partition = tp
self.messages = messages
- # When fetching an offset that is in the middle of a
- # compressed batch, we will get all messages in the batch.
- # But we want to start 'take' at the fetch_offset
- for i, msg in enumerate(messages):
- if msg.offset == fetch_offset:
- self.message_idx = i
+ self.message_idx = 0
# For truthiness evaluation we need to define __len__ or __nonzero__
def __len__(self):
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index ef3f686..429071a 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -303,29 +303,6 @@ def test__handle_offset_response(fetcher, mocker):
assert isinstance(fut.exception, NotLeaderForPartitionError)
-def test_partition_records_offset():
- """Test that compressed messagesets are handled correctly
- when fetch offset is in the middle of the message list
- """
- batch_start = 120
- batch_end = 130
- fetch_offset = 123
- tp = TopicPartition('foo', 0)
- messages = [ConsumerRecord(tp.topic, tp.partition, i,
- None, None, 'key', 'value', 'checksum', 0, 0)
- for i in range(batch_start, batch_end)]
- records = Fetcher.PartitionRecords(fetch_offset, None, messages)
- assert len(records) > 0
- msgs = records.take(1)
- assert msgs[0].offset == 123
- assert records.fetch_offset == 124
- msgs = records.take(2)
- assert len(msgs) == 2
- assert len(records) > 0
- records.discard()
- assert len(records) == 0
-
-
def test_fetched_records(fetcher, topic, mocker):
fetcher.config['check_crcs'] = False
tp = TopicPartition(topic, 0)