summaryrefslogtreecommitdiff
path: root/test/test_fetcher.py
diff options
context:
space:
mode:
authorTaras Voinarovskyi <voyn1991@gmail.com>2017-10-14 23:06:27 +0300
committerGitHub <noreply@github.com>2017-10-14 23:06:27 +0300
commitfbbd6ca5d999a8520d483ecfe0ad6f805eb8833f (patch)
tree52e5860b1f8738b15e7c757c205961b761badd2b /test/test_fetcher.py
parentdd8e33654f2270097d6c1373dc272153670e48f8 (diff)
parent365cae02da59721df77923bb5f5a2d94a84b2e83 (diff)
downloadkafka-python-fbbd6ca5d999a8520d483ecfe0ad6f805eb8833f.tar.gz
Merge pull request #1252 from dpkp/legacy_records_refactor
Refactor MessageSet and Message into LegacyRecordBatch
Diffstat (limited to 'test/test_fetcher.py')
-rw-r--r--test/test_fetcher.py122
1 files changed, 27 insertions, 95 deletions
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index 5da597c..364a808 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -8,22 +8,20 @@ import itertools
import time
from kafka.client_async import KafkaClient
-from kafka.codec import gzip_encode
from kafka.consumer.fetcher import (
CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError
)
from kafka.consumer.subscription_state import SubscriptionState
from kafka.metrics import Metrics
from kafka.protocol.fetch import FetchRequest, FetchResponse
-from kafka.protocol.message import Message
from kafka.protocol.offset import OffsetResponse
-from kafka.protocol.types import Int64, Int32
from kafka.structs import TopicPartition
from kafka.future import Future
from kafka.errors import (
StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError,
UnknownTopicOrPartitionError, OffsetOutOfRangeError
)
+from kafka.record.memory_records import MemoryRecordsBuilder, MemoryRecords
@pytest.fixture
@@ -51,6 +49,16 @@ def fetcher(client, subscription_state, topic):
return Fetcher(client, subscription_state, Metrics())
+def _build_record_batch(msgs, compression=0):
+ builder = MemoryRecordsBuilder(
+ magic=1, compression_type=0, batch_size=9999999)
+ for msg in msgs:
+ key, value, timestamp = msg
+ builder.append(key=key, value=value, timestamp=timestamp)
+ builder.close()
+ return builder.buffer()
+
+
def test_send_fetches(fetcher, topic, mocker):
fetch_requests = [
FetchRequest[0](
@@ -321,12 +329,12 @@ def test_partition_records_offset():
def test_fetched_records(fetcher, topic, mocker):
fetcher.config['check_crcs'] = False
tp = TopicPartition(topic, 0)
+
msgs = []
for i in range(10):
- msg = Message(b'foo')
- msgs.append((i, -1, msg))
+ msgs.append((None, b"foo", None))
completed_fetch = CompletedFetch(
- tp, 0, 0, [0, 100, msgs],
+ tp, 0, 0, [0, 100, _build_record_batch(msgs)],
mocker.MagicMock()
)
fetcher._completed_fetches.append(completed_fetch)
@@ -401,11 +409,12 @@ def test__unpack_message_set(fetcher):
fetcher.config['check_crcs'] = False
tp = TopicPartition('foo', 0)
messages = [
- (0, None, Message(b'a')),
- (1, None, Message(b'b')),
- (2, None, Message(b'c'))
+ (None, b"a", None),
+ (None, b"b", None),
+ (None, b"c", None),
]
- records = list(fetcher._unpack_message_set(tp, messages))
+ memory_records = MemoryRecords(_build_record_batch(messages))
+ records = list(fetcher._unpack_message_set(tp, memory_records))
assert len(records) == 3
assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
assert records[0].value == b'a'
@@ -416,88 +425,14 @@ def test__unpack_message_set(fetcher):
assert records[2].offset == 2
-def test__unpack_message_set_compressed_v0(fetcher):
- fetcher.config['check_crcs'] = False
- tp = TopicPartition('foo', 0)
- messages = [
- (0, None, Message(b'a')),
- (1, None, Message(b'b')),
- (2, None, Message(b'c')),
- ]
- message_bytes = []
- for offset, _, m in messages:
- encoded = m.encode()
- message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded)
- compressed_bytes = gzip_encode(b''.join(message_bytes))
- compressed_base_offset = 0
- compressed_msgs = [
- (compressed_base_offset, None,
- Message(compressed_bytes,
- magic=0,
- attributes=Message.CODEC_GZIP))
- ]
- records = list(fetcher._unpack_message_set(tp, compressed_msgs))
- assert len(records) == 3
- assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
- assert records[0].value == b'a'
- assert records[1].value == b'b'
- assert records[2].value == b'c'
- assert records[0].offset == 0
- assert records[1].offset == 1
- assert records[2].offset == 2
-
-
-def test__unpack_message_set_compressed_v1(fetcher):
- fetcher.config['check_crcs'] = False
- tp = TopicPartition('foo', 0)
- messages = [
- (0, None, Message(b'a')),
- (1, None, Message(b'b')),
- (2, None, Message(b'c')),
- ]
- message_bytes = []
- for offset, _, m in messages:
- encoded = m.encode()
- message_bytes.append(Int64.encode(offset) + Int32.encode(len(encoded)) + encoded)
- compressed_bytes = gzip_encode(b''.join(message_bytes))
- compressed_base_offset = 10
- compressed_msgs = [
- (compressed_base_offset, None,
- Message(compressed_bytes,
- magic=1,
- attributes=Message.CODEC_GZIP))
- ]
- records = list(fetcher._unpack_message_set(tp, compressed_msgs))
- assert len(records) == 3
- assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
- assert records[0].value == b'a'
- assert records[1].value == b'b'
- assert records[2].value == b'c'
- assert records[0].offset == 8
- assert records[1].offset == 9
- assert records[2].offset == 10
-
-
-def test__parse_record(fetcher):
- tp = TopicPartition('foo', 0)
- record = fetcher._parse_record(tp, 123, 456, Message(b'abc'))
- assert record.topic == 'foo'
- assert record.partition == 0
- assert record.offset == 123
- assert record.timestamp == 456
- assert record.value == b'abc'
- assert record.key is None
-
-
def test__message_generator(fetcher, topic, mocker):
fetcher.config['check_crcs'] = False
tp = TopicPartition(topic, 0)
msgs = []
for i in range(10):
- msg = Message(b'foo')
- msgs.append((i, -1, msg))
+ msgs.append((None, b"foo", None))
completed_fetch = CompletedFetch(
- tp, 0, 0, [0, 100, msgs],
+ tp, 0, 0, [0, 100, _build_record_batch(msgs)],
mocker.MagicMock()
)
fetcher._completed_fetches.append(completed_fetch)
@@ -513,10 +448,9 @@ def test__parse_fetched_data(fetcher, topic, mocker):
tp = TopicPartition(topic, 0)
msgs = []
for i in range(10):
- msg = Message(b'foo')
- msgs.append((i, -1, msg))
+ msgs.append((None, b"foo", None))
completed_fetch = CompletedFetch(
- tp, 0, 0, [0, 100, msgs],
+ tp, 0, 0, [0, 100, _build_record_batch(msgs)],
mocker.MagicMock()
)
partition_record = fetcher._parse_fetched_data(completed_fetch)
@@ -529,10 +463,9 @@ def test__parse_fetched_data__paused(fetcher, topic, mocker):
tp = TopicPartition(topic, 0)
msgs = []
for i in range(10):
- msg = Message(b'foo')
- msgs.append((i, -1, msg))
+ msgs.append((None, b"foo", None))
completed_fetch = CompletedFetch(
- tp, 0, 0, [0, 100, msgs],
+ tp, 0, 0, [0, 100, _build_record_batch(msgs)],
mocker.MagicMock()
)
fetcher._subscriptions.pause(tp)
@@ -545,10 +478,9 @@ def test__parse_fetched_data__stale_offset(fetcher, topic, mocker):
tp = TopicPartition(topic, 0)
msgs = []
for i in range(10):
- msg = Message(b'foo')
- msgs.append((i, -1, msg))
+ msgs.append((None, b"foo", None))
completed_fetch = CompletedFetch(
- tp, 10, 0, [0, 100, msgs],
+ tp, 10, 0, [0, 100, _build_record_batch(msgs)],
mocker.MagicMock()
)
partition_record = fetcher._parse_fetched_data(completed_fetch)