diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-12-17 17:06:46 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-12-17 17:06:46 -0800 |
commit | 64347bd9faa0a314c3c49b48df941d3022f9e311 (patch) | |
tree | f6957cb1f95a7f2a951123975b205323d6d70618 /kafka/consumer | |
parent | 07e09c1c2ec6787fc7e4f3c2578d31b4a15d20bc (diff) | |
download | kafka-python-serialize_interface.tar.gz |
Add kafka.serializer interfacesserialize_interface
Diffstat (limited to 'kafka/consumer')
-rw-r--r-- | kafka/consumer/fetcher.py | 31 |
1 files changed, 19 insertions, 12 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index d09f9da..cda136d 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -15,6 +15,7 @@ from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.fetch import FetchRequest from kafka.protocol.message import PartialMessage from kafka.protocol.offset import OffsetRequest, OffsetResetStrategy +from kafka.serializer import Deserializer from kafka.structs import TopicPartition log = logging.getLogger(__name__) @@ -507,7 +508,12 @@ class Fetcher(six.Iterator): if absolute_base_offset >= 0: inner_offset += absolute_base_offset - key, value = self._deserialize(inner_msg) + key = self._deserialize( + self.config['key_deserializer'], + tp.topic, inner_msg.key) + value = self._deserialize( + self.config['value_deserializer'], + tp.topic, inner_msg.value) yield ConsumerRecord(tp.topic, tp.partition, inner_offset, inner_timestamp, msg.timestamp_type, key, value, inner_msg.crc, @@ -515,7 +521,12 @@ class Fetcher(six.Iterator): len(inner_msg.value) if inner_msg.value is not None else -1) else: - key, value = self._deserialize(msg) + key = self._deserialize( + self.config['key_deserializer'], + tp.topic, msg.key) + value = self._deserialize( + self.config['value_deserializer'], + tp.topic, msg.value) yield ConsumerRecord(tp.topic, tp.partition, offset, msg.timestamp, msg.timestamp_type, key, value, msg.crc, @@ -541,16 +552,12 @@ class Fetcher(six.Iterator): self._iterator = None raise - def _deserialize(self, msg): - if self.config['key_deserializer']: - key = self.config['key_deserializer'](msg.key) # pylint: disable-msg=not-callable - else: - key = msg.key - if self.config['value_deserializer']: - value = self.config['value_deserializer'](msg.value) # pylint: disable-msg=not-callable - else: - value = msg.value - return key, value + def _deserialize(self, f, topic, bytes_): + if not f: + return bytes_ + if isinstance(f, Deserializer): + return f.deserialize(topic, bytes_) + return f(bytes_) def _send_offset_request(self, partition, timestamp): """Fetch a single offset before the given timestamp for the partition. |