summaryrefslogtreecommitdiff
path: root/test/test_sender.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_sender.py')
-rw-r--r--test/test_sender.py18
1 files changed, 9 insertions, 9 deletions
diff --git a/test/test_sender.py b/test/test_sender.py
index f37e194..2a68def 100644
--- a/test/test_sender.py
+++ b/test/test_sender.py
@@ -1,20 +1,17 @@
# pylint: skip-file
from __future__ import absolute_import
-import io
-
import pytest
+import io
from kafka.client_async import KafkaClient
from kafka.cluster import ClusterMetadata
-import kafka.errors as Errors
-from kafka.future import Future
from kafka.metrics import Metrics
-from kafka.producer.buffer import MessageSetBuffer
from kafka.protocol.produce import ProduceRequest
-from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch
+from kafka.producer.record_accumulator import RecordAccumulator, ProducerBatch
from kafka.producer.sender import Sender
-from kafka.structs import TopicPartition, OffsetAndMetadata
+from kafka.record.memory_records import MemoryRecordsBuilder
+from kafka.structs import TopicPartition
@pytest.fixture
@@ -47,7 +44,10 @@ def sender(client, accumulator, metrics):
def test_produce_request(sender, mocker, api_version, produce_version):
sender.config['api_version'] = api_version
tp = TopicPartition('foo', 0)
- records = MessageSetBuffer(io.BytesIO(), 100000)
- batch = RecordBatch(tp, records)
+ buffer = io.BytesIO()
+ records = MemoryRecordsBuilder(
+ magic=1, compression_type=0, batch_size=100000)
+ batch = ProducerBatch(tp, records, buffer)
+ records.close()
produce_request = sender._produce_request(0, 0, 0, [batch])
assert isinstance(produce_request, ProduceRequest[produce_version])