diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-05-22 11:35:26 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-05-22 11:35:26 -0700 |
commit | c9bff0701ea153933dee8c03cf8feb3470e39c52 (patch) | |
tree | e9cc1eddfd4f03981762127d035c58db9c9a5269 | |
parent | 7941a2ac7ec6663f08c6291d92746eae9f792916 (diff) | |
download | kafka-python-produce_fetch_v2.tar.gz |
Use Fetch/Produce API v2 for brokers >= 0.10 (uses message format v1)produce_fetch_v2
-rw-r--r-- | kafka/consumer/fetcher.py | 7 | ||||
-rw-r--r-- | kafka/errors.py | 6 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 3 | ||||
-rw-r--r-- | kafka/producer/sender.py | 16 | ||||
-rw-r--r-- | kafka/protocol/fetch.py | 17 | ||||
-rw-r--r-- | kafka/protocol/produce.py | 26 | ||||
-rw-r--r-- | test/test_fetcher.py | 15 | ||||
-rw-r--r-- | test/test_sender.py | 47 |
8 files changed, 127 insertions, 10 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index bf59775..e5a165e 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -581,7 +581,12 @@ class Fetcher(six.Iterator): log.debug("Adding fetch request for partition %s at offset %d", partition, position) - version = 1 if self.config['api_version'] >= (0, 9) else 0 + if self.config['api_version'] >= (0, 10): + version = 2 + elif self.config['api_version'] == (0, 9): + version = 1 + else: + version = 0 requests = {} for node_id, partition_data in six.iteritems(fetchable): requests[node_id] = FetchRequest[version]( diff --git a/kafka/errors.py b/kafka/errors.py index dd64b04..a34ffef 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -310,6 +310,12 @@ class ClusterAuthorizationFailedError(BrokerResponseError): ' use an inter-broker or administrative API.') +class InvalidTimestampError(BrokerResponseError): + errno = 32 + message = 'INVALID_TIMESTAMP' + description = ('The timestamp of the message is out of acceptable range.') + + class KafkaUnavailableError(KafkaError): pass diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 7aa24b3..fc60e78 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -283,7 +283,8 @@ class KafkaProducer(object): if self.config['compression_type'] == 'lz4': assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' - self._accumulator = RecordAccumulator(**self.config) + message_version = 1 if self.config['api_version'] >= (0, 10) else 0 + self._accumulator = RecordAccumulator(message_version=message_version, **self.config) self._metadata = client.cluster self._sender = Sender(client, self._metadata, self._accumulator, **self.config) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 9c36c9b..f10c34c 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -174,11 +174,16 @@ class Sender(threading.Thread): for batch in batches]) for topic, partitions in response.topics: - for partition, error_code, offset in partitions: + for partition_info in partitions: + if response.API_VERSION < 2: + partition, error_code, offset = partition_info + ts = None + else: + partition, error_code, offset, ts = partition_info tp = TopicPartition(topic, partition) error = Errors.for_code(error_code) batch = batches_by_partition[tp] - self._complete_batch(batch, error, offset) + self._complete_batch(batch, error, offset, ts) else: # this is the acks = 0 case, just complete all requests @@ -258,7 +263,12 @@ class Sender(threading.Thread): buf = batch.records.buffer() produce_records_by_partition[topic][partition] = buf - version = 1 if self.config['api_version'] >= (0, 9) else 0 + if self.config['api_version'] >= (0, 10): + version = 2 + elif self.config['api_version'] == (0, 9): + version = 1 + else: + version = 0 return ProduceRequest[version]( required_acks=acks, timeout=timeout, diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index 6aba972..0542ad2 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -32,6 +32,12 @@ class FetchResponse_v1(Struct): ) +class FetchResponse_v2(Struct): + API_KEY = 1 + API_VERSION = 2 + SCHEMA = FetchResponse_v1.SCHEMA # message format changed internally + + class FetchRequest_v0(Struct): API_KEY = 1 API_VERSION = 0 @@ -56,5 +62,12 @@ class FetchRequest_v1(Struct): SCHEMA = FetchRequest_v0.SCHEMA -FetchRequest = [FetchRequest_v0, FetchRequest_v1] -FetchResponse = [FetchResponse_v0, FetchResponse_v1] +class FetchRequest_v2(Struct): + API_KEY = 1 + API_VERSION = 2 + RESPONSE_TYPE = FetchResponse_v2 + SCHEMA = FetchRequest_v1.SCHEMA + + +FetchRequest = [FetchRequest_v0, FetchRequest_v1, FetchRequest_v2] +FetchResponse = [FetchResponse_v0, FetchResponse_v1, FetchResponse_v2] diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index e0b8622..3775796 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -30,6 +30,21 @@ class ProduceResponse_v1(Struct): ) +class ProduceResponse_v2(Struct): + API_KEY = 0 + API_VERSION = 2 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('offset', Int64), + ('timestamp', Int64))))), + ('thottle_time_ms', Int32) + ) + + class ProduceRequest_v0(Struct): API_KEY = 0 API_VERSION = 0 @@ -52,5 +67,12 @@ class ProduceRequest_v1(Struct): SCHEMA = ProduceRequest_v0.SCHEMA -ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1] -ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1] +class ProduceRequest_v2(Struct): + API_KEY = 0 + API_VERSION = 2 + RESPONSE_TYPE = ProduceResponse_v2 + SCHEMA = ProduceRequest_v1.SCHEMA + + +ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2] +ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2] diff --git a/test/test_fetcher.py b/test/test_fetcher.py index bf4a3a9..7e529bc 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -15,7 +15,7 @@ from kafka.structs import TopicPartition, OffsetAndMetadata @pytest.fixture def client(mocker): - return mocker.Mock(spec=KafkaClient) + return mocker.Mock(spec=KafkaClient(bootstrap_servers=[])) @pytest.fixture @@ -71,6 +71,19 @@ def test_init_fetches(fetcher, mocker): assert len(ret) == len(fetch_requests) +@pytest.mark.parametrize(("api_version", "fetch_version"), [ + ((0, 10), 2), + ((0, 9), 1), + ((0, 8), 0) +]) +def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version): + fetcher._client.in_flight_request_count.return_value = 0 + fetcher.config['api_version'] = api_version + by_node = fetcher._create_fetch_requests() + requests = by_node.values() + assert all([isinstance(r, FetchRequest[fetch_version]) for r in requests]) + + def test_update_fetch_positions(fetcher, mocker): mocker.patch.object(fetcher, '_reset_offset') partition = TopicPartition('foobar', 0) diff --git a/test/test_sender.py b/test/test_sender.py new file mode 100644 index 0000000..bb9068e --- /dev/null +++ b/test/test_sender.py @@ -0,0 +1,47 @@ +# pylint: skip-file +from __future__ import absolute_import + +import io + +import pytest + +from kafka.client_async import KafkaClient +from kafka.cluster import ClusterMetadata +from kafka.producer.buffer import MessageSetBuffer +from kafka.producer.sender import Sender +from kafka.producer.record_accumulator import RecordAccumulator, RecordBatch +import kafka.errors as Errors +from kafka.future import Future +from kafka.protocol.produce import ProduceRequest +from kafka.structs import TopicPartition, OffsetAndMetadata + + +@pytest.fixture +def client(mocker): + _cli = mocker.Mock(spec=KafkaClient(bootstrap_servers=[])) + _cli.cluster = mocker.Mock(spec=ClusterMetadata()) + return _cli + + +@pytest.fixture +def accumulator(): + return RecordAccumulator() + + +@pytest.fixture +def sender(client, accumulator): + return Sender(client, client.cluster, accumulator) + + +@pytest.mark.parametrize(("api_version", "produce_version"), [ + ((0, 10), 2), + ((0, 9), 1), + ((0, 8), 0) +]) +def test_produce_request(sender, mocker, api_version, produce_version): + sender.config['api_version'] = api_version + tp = TopicPartition('foo', 0) + records = MessageSetBuffer(io.BytesIO(), 100000) + batch = RecordBatch(tp, records) + produce_request = sender._produce_request(0, 0, 0, [batch]) + assert isinstance(produce_request, ProduceRequest[produce_version]) |