summaryrefslogtreecommitdiff
path: root/kafka/producer
diff options
context:
space:
mode:
authorTaras Voinarovskyi <voyn1991@gmail.com>2017-10-22 16:56:28 +0900
committerGitHub <noreply@github.com>2017-10-22 16:56:28 +0900
commita345dcd2ca1b0f8934864c512a4a78c65034dd36 (patch)
tree0b7ea8c67b015f944b9a401f5e024a2eff7c7db9 /kafka/producer
parent4dbf34abce9b4addbb304520e2f692fbaef60ae5 (diff)
downloadkafka-python-a345dcd2ca1b0f8934864c512a4a78c65034dd36.tar.gz
Fix timestamp not passed to RecordMetadata (#1273)
* Fix timestamp not being passed to RecordMetadata properly * Add more tests for LegacyBatch * Fix producer test for recordmetadata
Diffstat (limited to 'kafka/producer')
-rw-r--r--kafka/producer/future.py4
-rw-r--r--kafka/producer/record_accumulator.py11
2 files changed, 8 insertions, 7 deletions
diff --git a/kafka/producer/future.py b/kafka/producer/future.py
index bc50d0d..e39a0a9 100644
--- a/kafka/producer/future.py
+++ b/kafka/producer/future.py
@@ -44,7 +44,9 @@ class FutureRecordMetadata(Future):
(relative_offset, timestamp_ms, checksum,
serialized_key_size, serialized_value_size) = self.args
- if produce_timestamp_ms is not None:
+ # None is when Broker does not support the API (<0.10) and
+ # -1 is when the broker is configured for CREATE_TIME timestamps
+ if produce_timestamp_ms is not None and produce_timestamp_ms != -1:
timestamp_ms = produce_timestamp_ms
if offset != -1 and relative_offset is not None:
offset += relative_offset
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 716ae65..5158474 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -56,15 +56,14 @@ class ProducerBatch(object):
return self.records.next_offset()
def try_append(self, timestamp_ms, key, value):
- offset = self.records.next_offset()
- checksum, record_size = self.records.append(timestamp_ms, key, value)
- if record_size == 0:
+ metadata = self.records.append(timestamp_ms, key, value)
+ if metadata is None:
return None
- self.max_record_size = max(self.max_record_size, record_size)
+ self.max_record_size = max(self.max_record_size, metadata.size)
self.last_append = time.time()
- future = FutureRecordMetadata(self.produce_future, offset,
- timestamp_ms, checksum,
+ future = FutureRecordMetadata(self.produce_future, metadata.offset,
+ metadata.timestamp, metadata.crc,
len(key) if key is not None else -1,
len(value) if value is not None else -1)
return future