summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTaras Voinarovskyi <voyn1991@gmail.com>2016-11-18 19:08:19 +0200
committerDana Powers <dana.powers@gmail.com>2016-11-18 09:08:19 -0800
commit6c9f7280c5adee9db0f2b766c54bd9e386a56f25 (patch)
tree12ccb06c808d1cfbf76ecedaca07eef573ef6ab5
parentf71cfc4607c0295a8e131576f8619c9f8ff8f66f (diff)
downloadkafka-python-6c9f7280c5adee9db0f2b766c54bd9e386a56f25.tar.gz
:wPass timestamp into Message, not just mimic it (#875)
-rw-r--r--kafka/producer/record_accumulator.py8
1 files changed, 6 insertions, 2 deletions
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 8fe6abb..7610fe2 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -55,9 +55,13 @@ class RecordBatch(object):
if not self.records.has_room_for(key, value):
return None
- msg = Message(value, key=key, magic=self.message_version)
+ if self.message_version == 0:
+ msg = Message(value, key=key, magic=self.message_version)
+ else:
+ msg = Message(value, key=key, magic=self.message_version,
+ timestamp=timestamp_ms)
record_size = self.records.append(self.record_count, msg)
- checksum = msg.crc # crc is recalculated during records.append()
+ checksum = msg.crc # crc is recalculated during records.append()
self.max_record_size = max(self.max_record_size, record_size)
self.last_append = time.time()
future = FutureRecordMetadata(self.produce_future, self.record_count,