summaryrefslogtreecommitdiff
path: root/test/test_sender.py
blob: 2a68defcfad634878029c17a7fa1a4f1fa48e9fa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# pylint: skip-file
from __future__ import absolute_import

import pytest
import io

from kafka.client_async import KafkaClient
from kafka.cluster import ClusterMetadata
from kafka.metrics import Metrics
from kafka.protocol.produce import ProduceRequest
from kafka.producer.record_accumulator import RecordAccumulator, ProducerBatch
from kafka.producer.sender import Sender
from kafka.record.memory_records import MemoryRecordsBuilder
from kafka.structs import TopicPartition


@pytest.fixture
def client(mocker):
    _cli = mocker.Mock(spec=KafkaClient(bootstrap_servers=(), api_version=(0, 9)))
    _cli.cluster = mocker.Mock(spec=ClusterMetadata())
    return _cli


@pytest.fixture
def accumulator():
    return RecordAccumulator()


@pytest.fixture
def metrics():
    return Metrics()


@pytest.fixture
def sender(client, accumulator, metrics):
    return Sender(client, client.cluster, accumulator, metrics)


@pytest.mark.parametrize(("api_version", "produce_version"), [
    ((0, 10), 2),
    ((0, 9), 1),
    ((0, 8), 0)
])
def test_produce_request(sender, mocker, api_version, produce_version):
    sender.config['api_version'] = api_version
    tp = TopicPartition('foo', 0)
    buffer = io.BytesIO()
    records = MemoryRecordsBuilder(
        magic=1, compression_type=0, batch_size=100000)
    batch = ProducerBatch(tp, records, buffer)
    records.close()
    produce_request = sender._produce_request(0, 0, 0, [batch])
    assert isinstance(produce_request, ProduceRequest[produce_version])