diff options
-rw-r--r-- | kafka/producer/record_accumulator.py | 8 |
1 files changed, 6 insertions, 2 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 8fe6abb..7610fe2 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -55,9 +55,13 @@ class RecordBatch(object): if not self.records.has_room_for(key, value): return None - msg = Message(value, key=key, magic=self.message_version) + if self.message_version == 0: + msg = Message(value, key=key, magic=self.message_version) + else: + msg = Message(value, key=key, magic=self.message_version, + timestamp=timestamp_ms) record_size = self.records.append(self.record_count, msg) - checksum = msg.crc # crc is recalculated during records.append() + checksum = msg.crc # crc is recalculated during records.append() self.max_record_size = max(self.max_record_size, record_size) self.last_append = time.time() future = FutureRecordMetadata(self.produce_future, self.record_count, |