diff options
Diffstat (limited to 'test/test_sender.py')
-rw-r--r-- | test/test_sender.py | 47 |
1 files changed, 47 insertions, 0 deletions
diff --git a/test/test_sender.py b/test/test_sender.py new file mode 100644 index 0000000..bb9068e --- /dev/null +++ b/test/test_sender.py @@ -0,0 +1,47 @@ +# pylint: skip-file +from __future__ import absolute_import + +import io + +import pytest + +from kafka.client_async import KafkaClient +from kafka.cluster import ClusterMetadata +from kafka.producer.buffer import MessageSetBuffer +from kafka.producer.sender import Sender +from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch +import kafka.errors as Errors +from kafka.future import Future +from kafka.protocol.produce import ProduceRequest +from kafka.structs import TopicPartition, OffsetAndMetadata + + +@pytest.fixture +def client(mocker): + _cli = mocker.Mock(spec=KafkaClient(bootstrap_servers=[])) + _cli.cluster = mocker.Mock(spec=ClusterMetadata()) + return _cli + + +@pytest.fixture +def accumulator(): + return RecordAccumulator() + + +@pytest.fixture +def sender(client, accumulator): + return Sender(client, client.cluster, accumulator) + + +@pytest.mark.parametrize(("api_version", "produce_version"), [ + ((0, 10), 2), + ((0, 9), 1), + ((0, 8), 0) +]) +def test_produce_request(sender, mocker, api_version, produce_version): + sender.config['api_version'] = api_version + tp = TopicPartition('foo', 0) + records = MessageSetBuffer(io.BytesIO(), 100000) + batch = RecordBatch(tp, records) + produce_request = sender._produce_request(0, 0, 0, [batch]) + assert isinstance(produce_request, ProduceRequest[produce_version]) |