summaryrefslogtreecommitdiff
path: root/kafka/producer/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/kafka.py')
-rw-r--r--kafka/producer/kafka.py18
1 files changed, 13 insertions, 5 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 24b58fe..4fc7bc6 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -513,7 +513,7 @@ class KafkaProducer(object):
return LegacyRecordBatchBuilder.estimate_size_in_bytes(
magic, self.config['compression_type'], key, value)
- def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
+ def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
"""Publish a message to a topic.
Arguments:
@@ -534,6 +534,8 @@ class KafkaProducer(object):
partition (but if key is None, partition is chosen randomly).
Must be type bytes, or be serializable to bytes via configured
key_serializer.
+ headers (optional): a list of header key value pairs. List items
+ are tuples of str key and bytes value.
timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC)
to use as the message timestamp. Defaults to current time.
@@ -563,13 +565,18 @@ class KafkaProducer(object):
partition = self._partition(topic, partition, key, value,
key_bytes, value_bytes)
- message_size = self._estimate_size_in_bytes(key_bytes, value_bytes)
+ if headers is None:
+ headers = []
+ assert type(headers) == list
+ assert all(type(item) == tuple and len(item) == 2 and type(item[0]) == str and type(item[1]) == bytes for item in headers)
+
+ message_size = self._estimate_size_in_bytes(key_bytes, value_bytes, headers)
self._ensure_valid_record_size(message_size)
tp = TopicPartition(topic, partition)
- log.debug("Sending (key=%r value=%r) to %s", key, value, tp)
+ log.debug("Sending (key=%r value=%r headers=%r) to %s", key, value, headers, tp)
result = self._accumulator.append(tp, timestamp_ms,
- key_bytes, value_bytes,
+ key_bytes, value_bytes, headers,
self.config['max_block_ms'],
estimated_size=message_size)
future, batch_is_full, new_batch_created = result
@@ -588,7 +595,8 @@ class KafkaProducer(object):
FutureProduceResult(TopicPartition(topic, partition)),
-1, None, None,
len(key_bytes) if key_bytes is not None else -1,
- len(value_bytes) if value_bytes is not None else -1
+ len(value_bytes) if value_bytes is not None else -1,
+ sum(len(h_key.encode("utf-8")) + len(h_value) for h_key, h_value in headers) if headers else -1,
).failure(e)
def flush(self, timeout=None):