diff options
author | Heikki Nousiainen <htn@aiven.io> | 2018-08-14 15:38:42 +0300 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2018-09-27 15:22:03 -0700 |
commit | 0ca4313170df2657456009af5550942ace9f1a81 (patch) | |
tree | cb39f5b2e6823a8dac73ca0313ff0a68bd96c572 | |
parent | 9d30ab8bdbbd7e722ba4a96a6883a965d577d3cc (diff) | |
download | kafka-python-0ca4313170df2657456009af5550942ace9f1a81.tar.gz |
Expose record headers in ConsumerRecords
-rw-r--r-- | README.rst | 5 | ||||
-rw-r--r-- | kafka/consumer/fetcher.py | 8 | ||||
-rw-r--r-- | test/test_fetcher.py | 6 |
3 files changed, 13 insertions, 6 deletions
@@ -70,6 +70,11 @@ that expose basic message attributes: topic, partition, offset, key, and value: >>> for msg in consumer: ... assert isinstance(msg.value, dict) +>>> # Access record headers. The returned value is a list of tuples +>>> # with str, bytes for key and value +>>> for msg in consumer: +... print (msg.headers) + >>> # Get consumer metrics >>> metrics = consumer.metrics() diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 6ec1b71..7d58b7c 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -29,7 +29,7 @@ READ_COMMITTED = 1 ConsumerRecord = collections.namedtuple("ConsumerRecord", ["topic", "partition", "offset", "timestamp", "timestamp_type", - "key", "value", "checksum", "serialized_key_size", "serialized_value_size"]) + "key", "value", "headers", "checksum", "serialized_key_size", "serialized_value_size", "serialized_header_size"]) CompletedFetch = collections.namedtuple("CompletedFetch", @@ -456,10 +456,12 @@ class Fetcher(six.Iterator): value = self._deserialize( self.config['value_deserializer'], tp.topic, record.value) + headers = record.headers + header_size = sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1 yield ConsumerRecord( tp.topic, tp.partition, record.offset, record.timestamp, - record.timestamp_type, key, value, record.checksum, - key_size, value_size) + record.timestamp_type, key, value, headers, record.checksum, + key_size, value_size, header_size) batch = records.next_batch() diff --git a/test/test_fetcher.py b/test/test_fetcher.py index c821018..e37a70d 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -509,7 +509,7 @@ def test_partition_records_offset(): fetch_offset = 123 tp = TopicPartition('foo', 0) messages = [ConsumerRecord(tp.topic, tp.partition, i, - None, None, 'key', 'value', 'checksum', 0, 0) + None, None, 'key', 'value', [], 'checksum', 0, 0, -1) for i in range(batch_start, batch_end)] records = Fetcher.PartitionRecords(fetch_offset, None, messages) assert len(records) > 0 @@ -534,7 +534,7 @@ def test_partition_records_no_fetch_offset(): fetch_offset = 123 tp = TopicPartition('foo', 0) messages = [ConsumerRecord(tp.topic, tp.partition, i, - None, None, 'key', 'value', 'checksum', 0, 0) + None, None, 'key', 'value', None, 'checksum', 0, 0, -1) for i in range(batch_start, batch_end)] records = Fetcher.PartitionRecords(fetch_offset, None, messages) assert len(records) == 0 @@ -549,7 +549,7 @@ def test_partition_records_compacted_offset(): fetch_offset = 42 tp = TopicPartition('foo', 0) messages = [ConsumerRecord(tp.topic, tp.partition, i, - None, None, 'key', 'value', 'checksum', 0, 0) + None, None, 'key', 'value', None, 'checksum', 0, 0, -1) for i in range(batch_start, batch_end) if i != fetch_offset] records = Fetcher.PartitionRecords(fetch_offset, None, messages) assert len(records) == batch_end - fetch_offset - 1 |