diff options
Diffstat (limited to 'kafka/producer/kafka.py')
-rw-r--r-- | kafka/producer/kafka.py | 18 |
1 files changed, 13 insertions, 5 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 24b58fe..4fc7bc6 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -513,7 +513,7 @@ class KafkaProducer(object): return LegacyRecordBatchBuilder.estimate_size_in_bytes( magic, self.config['compression_type'], key, value) - def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): + def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None): """Publish a message to a topic. Arguments: @@ -534,6 +534,8 @@ class KafkaProducer(object): partition (but if key is None, partition is chosen randomly). Must be type bytes, or be serializable to bytes via configured key_serializer. + headers (optional): a list of header key value pairs. List items + are tuples of str key and bytes value. timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time. @@ -563,13 +565,18 @@ class KafkaProducer(object): partition = self._partition(topic, partition, key, value, key_bytes, value_bytes) - message_size = self._estimate_size_in_bytes(key_bytes, value_bytes) + if headers is None: + headers = [] + assert type(headers) == list + assert all(type(item) == tuple and len(item) == 2 and type(item[0]) == str and type(item[1]) == bytes for item in headers) + + message_size = self._estimate_size_in_bytes(key_bytes, value_bytes, headers) self._ensure_valid_record_size(message_size) tp = TopicPartition(topic, partition) - log.debug("Sending (key=%r value=%r) to %s", key, value, tp) + log.debug("Sending (key=%r value=%r headers=%r) to %s", key, value, headers, tp) result = self._accumulator.append(tp, timestamp_ms, - key_bytes, value_bytes, + key_bytes, value_bytes, headers, self.config['max_block_ms'], estimated_size=message_size) future, batch_is_full, new_batch_created = result @@ -588,7 +595,8 @@ class KafkaProducer(object): FutureProduceResult(TopicPartition(topic, partition)), -1, None, None, len(key_bytes) if key_bytes is not None else -1, - len(value_bytes) if value_bytes is not None else -1 + len(value_bytes) if value_bytes is not None else -1, + sum(len(h_key.encode("utf-8")) + len(h_value) for h_key, h_value in headers) if headers else -1, ).failure(e) def flush(self, timeout=None): |