summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHeikki Nousiainen <htn@aiven.io>2018-08-14 15:38:42 +0300
committerJeff Widman <jeff@jeffwidman.com>2018-09-27 15:22:03 -0700
commit0ca4313170df2657456009af5550942ace9f1a81 (patch)
treecb39f5b2e6823a8dac73ca0313ff0a68bd96c572
parent9d30ab8bdbbd7e722ba4a96a6883a965d577d3cc (diff)
downloadkafka-python-0ca4313170df2657456009af5550942ace9f1a81.tar.gz
Expose record headers in ConsumerRecords
-rw-r--r--README.rst5
-rw-r--r--kafka/consumer/fetcher.py8
-rw-r--r--test/test_fetcher.py6
3 files changed, 13 insertions, 6 deletions
diff --git a/README.rst b/README.rst
index dcade43..28cb7e7 100644
--- a/README.rst
+++ b/README.rst
@@ -70,6 +70,11 @@ that expose basic message attributes: topic, partition, offset, key, and value:
>>> for msg in consumer:
... assert isinstance(msg.value, dict)
+>>> # Access record headers. The returned value is a list of tuples
+>>> # with str, bytes for key and value
+>>> for msg in consumer:
+... print (msg.headers)
+
>>> # Get consumer metrics
>>> metrics = consumer.metrics()
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 6ec1b71..7d58b7c 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -29,7 +29,7 @@ READ_COMMITTED = 1
ConsumerRecord = collections.namedtuple("ConsumerRecord",
["topic", "partition", "offset", "timestamp", "timestamp_type",
- "key", "value", "checksum", "serialized_key_size", "serialized_value_size"])
+ "key", "value", "headers", "checksum", "serialized_key_size", "serialized_value_size", "serialized_header_size"])
CompletedFetch = collections.namedtuple("CompletedFetch",
@@ -456,10 +456,12 @@ class Fetcher(six.Iterator):
value = self._deserialize(
self.config['value_deserializer'],
tp.topic, record.value)
+ headers = record.headers
+ header_size = sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1
yield ConsumerRecord(
tp.topic, tp.partition, record.offset, record.timestamp,
- record.timestamp_type, key, value, record.checksum,
- key_size, value_size)
+ record.timestamp_type, key, value, headers, record.checksum,
+ key_size, value_size, header_size)
batch = records.next_batch()
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index c821018..e37a70d 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -509,7 +509,7 @@ def test_partition_records_offset():
fetch_offset = 123
tp = TopicPartition('foo', 0)
messages = [ConsumerRecord(tp.topic, tp.partition, i,
- None, None, 'key', 'value', 'checksum', 0, 0)
+ None, None, 'key', 'value', [], 'checksum', 0, 0, -1)
for i in range(batch_start, batch_end)]
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
assert len(records) > 0
@@ -534,7 +534,7 @@ def test_partition_records_no_fetch_offset():
fetch_offset = 123
tp = TopicPartition('foo', 0)
messages = [ConsumerRecord(tp.topic, tp.partition, i,
- None, None, 'key', 'value', 'checksum', 0, 0)
+ None, None, 'key', 'value', None, 'checksum', 0, 0, -1)
for i in range(batch_start, batch_end)]
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
assert len(records) == 0
@@ -549,7 +549,7 @@ def test_partition_records_compacted_offset():
fetch_offset = 42
tp = TopicPartition('foo', 0)
messages = [ConsumerRecord(tp.topic, tp.partition, i,
- None, None, 'key', 'value', 'checksum', 0, 0)
+ None, None, 'key', 'value', None, 'checksum', 0, 0, -1)
for i in range(batch_start, batch_end) if i != fetch_offset]
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
assert len(records) == batch_end - fetch_offset - 1