diff options
author | Taras Voinarovskyi <voyn1991@gmail.com> | 2017-10-22 16:56:28 +0900 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-22 16:56:28 +0900 |
commit | a345dcd2ca1b0f8934864c512a4a78c65034dd36 (patch) | |
tree | 0b7ea8c67b015f944b9a401f5e024a2eff7c7db9 /kafka/producer | |
parent | 4dbf34abce9b4addbb304520e2f692fbaef60ae5 (diff) | |
download | kafka-python-a345dcd2ca1b0f8934864c512a4a78c65034dd36.tar.gz |
Fix timestamp not passed to RecordMetadata (#1273)
* Fix timestamp not being passed to RecordMetadata properly
* Add more tests for LegacyBatch
* Fix producer test for recordmetadata
Diffstat (limited to 'kafka/producer')
-rw-r--r-- | kafka/producer/future.py | 4 | ||||
-rw-r--r-- | kafka/producer/record_accumulator.py | 11 |
2 files changed, 8 insertions, 7 deletions
diff --git a/kafka/producer/future.py b/kafka/producer/future.py index bc50d0d..e39a0a9 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -44,7 +44,9 @@ class FutureRecordMetadata(Future): (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size) = self.args - if produce_timestamp_ms is not None: + # None is when Broker does not support the API (<0.10) and + # -1 is when the broker is configured for CREATE_TIME timestamps + if produce_timestamp_ms is not None and produce_timestamp_ms != -1: timestamp_ms = produce_timestamp_ms if offset != -1 and relative_offset is not None: offset += relative_offset diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 716ae65..5158474 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -56,15 +56,14 @@ class ProducerBatch(object): return self.records.next_offset() def try_append(self, timestamp_ms, key, value): - offset = self.records.next_offset() - checksum, record_size = self.records.append(timestamp_ms, key, value) - if record_size == 0: + metadata = self.records.append(timestamp_ms, key, value) + if metadata is None: return None - self.max_record_size = max(self.max_record_size, record_size) + self.max_record_size = max(self.max_record_size, metadata.size) self.last_append = time.time() - future = FutureRecordMetadata(self.produce_future, offset, - timestamp_ms, checksum, + future = FutureRecordMetadata(self.produce_future, metadata.offset, + metadata.timestamp, metadata.crc, len(key) if key is not None else -1, len(value) if value is not None else -1) return future |