diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/consumer/fetcher.py | 7 | ||||
-rw-r--r-- | kafka/errors.py | 6 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 3 | ||||
-rw-r--r-- | kafka/producer/sender.py | 16 | ||||
-rw-r--r-- | kafka/protocol/fetch.py | 17 | ||||
-rw-r--r-- | kafka/protocol/produce.py | 26 |
6 files changed, 66 insertions, 9 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index bf59775..e5a165e 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -581,7 +581,12 @@ class Fetcher(six.Iterator): log.debug("Adding fetch request for partition %s at offset %d", partition, position) - version = 1 if self.config['api_version'] >= (0, 9) else 0 + if self.config['api_version'] >= (0, 10): + version = 2 + elif self.config['api_version'] == (0, 9): + version = 1 + else: + version = 0 requests = {} for node_id, partition_data in six.iteritems(fetchable): requests[node_id] = FetchRequest[version]( diff --git a/kafka/errors.py b/kafka/errors.py index dd64b04..a34ffef 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -310,6 +310,12 @@ class ClusterAuthorizationFailedError(BrokerResponseError): ' use an inter-broker or administrative API.') +class InvalidTimestampError(BrokerResponseError): + errno = 32 + message = 'INVALID_TIMESTAMP' + description = ('The timestamp of the message is out of acceptable range.') + + class KafkaUnavailableError(KafkaError): pass diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 7aa24b3..fc60e78 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -283,7 +283,8 @@ class KafkaProducer(object): if self.config['compression_type'] == 'lz4': assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' - self._accumulator = RecordAccumulator(**self.config) + message_version = 1 if self.config['api_version'] >= (0, 10) else 0 + self._accumulator = RecordAccumulator(message_version=message_version, **self.config) self._metadata = client.cluster self._sender = Sender(client, self._metadata, self._accumulator, **self.config) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 9c36c9b..f10c34c 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -174,11 +174,16 @@ class Sender(threading.Thread): for batch in batches]) for topic, partitions in response.topics: - for partition, error_code, offset in partitions: + for partition_info in partitions: + if response.API_VERSION < 2: + partition, error_code, offset = partition_info + ts = None + else: + partition, error_code, offset, ts = partition_info tp = TopicPartition(topic, partition) error = Errors.for_code(error_code) batch = batches_by_partition[tp] - self._complete_batch(batch, error, offset) + self._complete_batch(batch, error, offset, ts) else: # this is the acks = 0 case, just complete all requests @@ -258,7 +263,12 @@ class Sender(threading.Thread): buf = batch.records.buffer() produce_records_by_partition[topic][partition] = buf - version = 1 if self.config['api_version'] >= (0, 9) else 0 + if self.config['api_version'] >= (0, 10): + version = 2 + elif self.config['api_version'] == (0, 9): + version = 1 + else: + version = 0 return ProduceRequest[version]( required_acks=acks, timeout=timeout, diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index 6aba972..0542ad2 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -32,6 +32,12 @@ class FetchResponse_v1(Struct): ) +class FetchResponse_v2(Struct): + API_KEY = 1 + API_VERSION = 2 + SCHEMA = FetchResponse_v1.SCHEMA # message format changed internally + + class FetchRequest_v0(Struct): API_KEY = 1 API_VERSION = 0 @@ -56,5 +62,12 @@ class FetchRequest_v1(Struct): SCHEMA = FetchRequest_v0.SCHEMA -FetchRequest = [FetchRequest_v0, FetchRequest_v1] -FetchResponse = [FetchResponse_v0, FetchResponse_v1] +class FetchRequest_v2(Struct): + API_KEY = 1 + API_VERSION = 2 + RESPONSE_TYPE = FetchResponse_v2 + SCHEMA = FetchRequest_v1.SCHEMA + + +FetchRequest = [FetchRequest_v0, FetchRequest_v1, FetchRequest_v2] +FetchResponse = [FetchResponse_v0, FetchResponse_v1, FetchResponse_v2] diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index e0b8622..3775796 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -30,6 +30,21 @@ class ProduceResponse_v1(Struct): ) +class ProduceResponse_v2(Struct): + API_KEY = 0 + API_VERSION = 2 + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('offset', Int64), + ('timestamp', Int64))))), + ('thottle_time_ms', Int32) + ) + + class ProduceRequest_v0(Struct): API_KEY = 0 API_VERSION = 0 @@ -52,5 +67,12 @@ class ProduceRequest_v1(Struct): SCHEMA = ProduceRequest_v0.SCHEMA -ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1] -ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1] +class ProduceRequest_v2(Struct): + API_KEY = 0 + API_VERSION = 2 + RESPONSE_TYPE = ProduceResponse_v2 + SCHEMA = ProduceRequest_v1.SCHEMA + + +ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2] +ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2] |