summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer/record_accumulator.py8
1 files changed, 6 insertions, 2 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 8fe6abb..7610fe2 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -55,9 +55,13 @@ class RecordBatch(object):
if not self.records.has_room_for(key, value):
return None
- msg = Message(value, key=key, magic=self.message_version)
+ if self.message_version == 0:
+ msg = Message(value, key=key, magic=self.message_version)
+ else:
+ msg = Message(value, key=key, magic=self.message_version,
+ timestamp=timestamp_ms)
record_size = self.records.append(self.record_count, msg)
- checksum = msg.crc # crc is recalculated during records.append()
+ checksum = msg.crc # crc is recalculated during records.append()
self.max_record_size = max(self.max_record_size, record_size)
self.last_append = time.time()
future = FutureRecordMetadata(self.produce_future, self.record_count,