summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/consumer/fetcher.py7
-rw-r--r--kafka/errors.py6
-rw-r--r--kafka/producer/kafka.py3
-rw-r--r--kafka/producer/sender.py16
-rw-r--r--kafka/protocol/fetch.py17
-rw-r--r--kafka/protocol/produce.py26
6 files changed, 66 insertions, 9 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index bf59775..e5a165e 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -581,7 +581,12 @@ class Fetcher(six.Iterator):
log.debug("Adding fetch request for partition %s at offset %d",
partition, position)
- version = 1 if self.config['api_version'] >= (0, 9) else 0
+ if self.config['api_version'] >= (0, 10):
+ version = 2
+ elif self.config['api_version'] == (0, 9):
+ version = 1
+ else:
+ version = 0
requests = {}
for node_id, partition_data in six.iteritems(fetchable):
requests[node_id] = FetchRequest[version](
diff --git a/kafka/errors.py b/kafka/errors.py
index dd64b04..a34ffef 100644
--- a/kafka/errors.py
+++ b/kafka/errors.py
@@ -310,6 +310,12 @@ class ClusterAuthorizationFailedError(BrokerResponseError):
' use an inter-broker or administrative API.')
+class InvalidTimestampError(BrokerResponseError):
+ errno = 32
+ message = 'INVALID_TIMESTAMP'
+ description = ('The timestamp of the message is out of acceptable range.')
+
+
class KafkaUnavailableError(KafkaError):
pass
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 7aa24b3..fc60e78 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -283,7 +283,8 @@ class KafkaProducer(object):
if self.config['compression_type'] == 'lz4':
assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
- self._accumulator = RecordAccumulator(**self.config)
+ message_version = 1 if self.config['api_version'] >= (0, 10) else 0
+ self._accumulator = RecordAccumulator(message_version=message_version, **self.config)
self._metadata = client.cluster
self._sender = Sender(client, self._metadata, self._accumulator,
**self.config)
diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py
index 9c36c9b..f10c34c 100644
--- a/kafka/producer/sender.py
+++ b/kafka/producer/sender.py
@@ -174,11 +174,16 @@ class Sender(threading.Thread):
for batch in batches])
for topic, partitions in response.topics:
- for partition, error_code, offset in partitions:
+ for partition_info in partitions:
+ if response.API_VERSION < 2:
+ partition, error_code, offset = partition_info
+ ts = None
+ else:
+ partition, error_code, offset, ts = partition_info
tp = TopicPartition(topic, partition)
error = Errors.for_code(error_code)
batch = batches_by_partition[tp]
- self._complete_batch(batch, error, offset)
+ self._complete_batch(batch, error, offset, ts)
else:
# this is the acks = 0 case, just complete all requests
@@ -258,7 +263,12 @@ class Sender(threading.Thread):
buf = batch.records.buffer()
produce_records_by_partition[topic][partition] = buf
- version = 1 if self.config['api_version'] >= (0, 9) else 0
+ if self.config['api_version'] >= (0, 10):
+ version = 2
+ elif self.config['api_version'] == (0, 9):
+ version = 1
+ else:
+ version = 0
return ProduceRequest[version](
required_acks=acks,
timeout=timeout,
diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py
index 6aba972..0542ad2 100644
--- a/kafka/protocol/fetch.py
+++ b/kafka/protocol/fetch.py
@@ -32,6 +32,12 @@ class FetchResponse_v1(Struct):
)
+class FetchResponse_v2(Struct):
+ API_KEY = 1
+ API_VERSION = 2
+ SCHEMA = FetchResponse_v1.SCHEMA # message format changed internally
+
+
class FetchRequest_v0(Struct):
API_KEY = 1
API_VERSION = 0
@@ -56,5 +62,12 @@ class FetchRequest_v1(Struct):
SCHEMA = FetchRequest_v0.SCHEMA
-FetchRequest = [FetchRequest_v0, FetchRequest_v1]
-FetchResponse = [FetchResponse_v0, FetchResponse_v1]
+class FetchRequest_v2(Struct):
+ API_KEY = 1
+ API_VERSION = 2
+ RESPONSE_TYPE = FetchResponse_v2
+ SCHEMA = FetchRequest_v1.SCHEMA
+
+
+FetchRequest = [FetchRequest_v0, FetchRequest_v1, FetchRequest_v2]
+FetchResponse = [FetchResponse_v0, FetchResponse_v1, FetchResponse_v2]
diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py
index e0b8622..3775796 100644
--- a/kafka/protocol/produce.py
+++ b/kafka/protocol/produce.py
@@ -30,6 +30,21 @@ class ProduceResponse_v1(Struct):
)
+class ProduceResponse_v2(Struct):
+ API_KEY = 0
+ API_VERSION = 2
+ SCHEMA = Schema(
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('offset', Int64),
+ ('timestamp', Int64))))),
+ ('thottle_time_ms', Int32)
+ )
+
+
class ProduceRequest_v0(Struct):
API_KEY = 0
API_VERSION = 0
@@ -52,5 +67,12 @@ class ProduceRequest_v1(Struct):
SCHEMA = ProduceRequest_v0.SCHEMA
-ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1]
-ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1]
+class ProduceRequest_v2(Struct):
+ API_KEY = 0
+ API_VERSION = 2
+ RESPONSE_TYPE = ProduceResponse_v2
+ SCHEMA = ProduceRequest_v1.SCHEMA
+
+
+ProduceRequest = [ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2]
+ProduceResponse = [ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2]