summaryrefslogtreecommitdiff
path: root/kafka/producer/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/kafka.py')
-rw-r--r--kafka/producer/kafka.py15
1 files changed, 11 insertions, 4 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 7e8f625..7aa24b3 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -347,7 +347,7 @@ class KafkaProducer(object):
max_wait = self.config['max_block_ms'] / 1000.0
return self._wait_on_metadata(topic, max_wait)
- def send(self, topic, value=None, key=None, partition=None):
+ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
"""Publish a message to a topic.
Arguments:
@@ -368,6 +368,8 @@ class KafkaProducer(object):
partition (but if key is None, partition is chosen randomly).
Must be type bytes, or be serializable to bytes via configured
key_serializer.
+ timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC)
+ to use as the message timestamp. Defaults to current time.
Returns:
FutureRecordMetadata: resolves to RecordMetadata
@@ -396,8 +398,11 @@ class KafkaProducer(object):
self._ensure_valid_record_size(message_size)
tp = TopicPartition(topic, partition)
+ if timestamp_ms is None:
+ timestamp_ms = int(time.time() * 1000)
log.debug("Sending (key=%s value=%s) to %s", key, value, tp)
- result = self._accumulator.append(tp, key_bytes, value_bytes,
+ result = self._accumulator.append(tp, timestamp_ms,
+ key_bytes, value_bytes,
self.config['max_block_ms'])
future, batch_is_full, new_batch_created = result
if batch_is_full or new_batch_created:
@@ -416,8 +421,10 @@ class KafkaProducer(object):
except Exception as e:
log.debug("Exception occurred during message send: %s", e)
return FutureRecordMetadata(
- FutureProduceResult(TopicPartition(topic, partition)),
- -1).failure(e)
+ FutureProduceResult(
+ TopicPartition(topic, partition)),
+ -1, None
+ ).failure(e)
def flush(self, timeout=None):
"""