diff options
| author | Dana Powers <dana.powers@gmail.com> | 2017-10-05 14:19:52 -0700 | 
|---|---|---|
| committer | GitHub <noreply@github.com> | 2017-10-05 14:19:52 -0700 | 
| commit | ffc7caef13a120f69788bcdd43ffa01468f575f9 (patch) | |
| tree | 978b5a04e589c92124af9c5a0e32ccf24912e1c7 /test/test_fetcher.py | |
| parent | cec1bdc9965b3d6729d4415e31b4dac04d603873 (diff) | |
| download | kafka-python-ffc7caef13a120f69788bcdd43ffa01468f575f9.tar.gz | |
Fix Fetcher.PartitionRecords to handle fetch_offset in the middle of compressed messageset (#1239)
Diffstat (limited to 'test/test_fetcher.py')
| -rw-r--r-- | test/test_fetcher.py | 25 | 
1 files changed, 24 insertions, 1 deletions
| diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 64eec1b..86d154f 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -7,7 +7,7 @@ import itertools  from collections import OrderedDict  from kafka.client_async import KafkaClient -from kafka.consumer.fetcher import Fetcher, NoOffsetForPartitionError +from kafka.consumer.fetcher import ConsumerRecord, Fetcher, NoOffsetForPartitionError  from kafka.consumer.subscription_state import SubscriptionState  from kafka.metrics import Metrics  from kafka.protocol.fetch import FetchRequest @@ -282,3 +282,26 @@ def test__handle_offset_response(fetcher, mocker):      fetcher._handle_offset_response(fut, res)      assert fut.failed()      assert isinstance(fut.exception, NotLeaderForPartitionError) + + +def test_partition_records_offset(): +    """Test that compressed messagesets are handle correctly +    when fetch offset is in the middle of the message list +    """ +    batch_start = 120 +    batch_end = 130 +    fetch_offset = 123 +    tp = TopicPartition('foo', 0) +    messages = [ConsumerRecord(tp.topic, tp.partition, i, +                               None, None, 'key', 'value', 'checksum', 0, 0) +                for i in range(batch_start, batch_end)] +    records = Fetcher.PartitionRecords(fetch_offset, None, messages) +    assert records.has_more() +    msgs = records.take(1) +    assert msgs[0].offset == 123 +    assert records.fetch_offset == 124 +    msgs = records.take(2) +    assert len(msgs) == 2 +    assert records.has_more() +    records.discard() +    assert not records.has_more() | 
