diff options
Diffstat (limited to 'kafka/producer/kafka.py')
-rw-r--r-- | kafka/producer/kafka.py | 15 |
1 files changed, 11 insertions, 4 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 7e8f625..7aa24b3 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -347,7 +347,7 @@ class KafkaProducer(object): max_wait = self.config['max_block_ms'] / 1000.0 return self._wait_on_metadata(topic, max_wait) - def send(self, topic, value=None, key=None, partition=None): + def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): """Publish a message to a topic. Arguments: @@ -368,6 +368,8 @@ class KafkaProducer(object): partition (but if key is None, partition is chosen randomly). Must be type bytes, or be serializable to bytes via configured key_serializer. + timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC) + to use as the message timestamp. Defaults to current time. Returns: FutureRecordMetadata: resolves to RecordMetadata @@ -396,8 +398,11 @@ class KafkaProducer(object): self._ensure_valid_record_size(message_size) tp = TopicPartition(topic, partition) + if timestamp_ms is None: + timestamp_ms = int(time.time() * 1000) log.debug("Sending (key=%s value=%s) to %s", key, value, tp) - result = self._accumulator.append(tp, key_bytes, value_bytes, + result = self._accumulator.append(tp, timestamp_ms, + key_bytes, value_bytes, self.config['max_block_ms']) future, batch_is_full, new_batch_created = result if batch_is_full or new_batch_created: @@ -416,8 +421,10 @@ class KafkaProducer(object): except Exception as e: log.debug("Exception occurred during message send: %s", e) return FutureRecordMetadata( - FutureProduceResult(TopicPartition(topic, partition)), - -1).failure(e) + FutureProduceResult( + TopicPartition(topic, partition)), + -1, None + ).failure(e) def flush(self, timeout=None): """ |