diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-12-17 17:06:46 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-12-17 17:06:46 -0800 |
commit | 64347bd9faa0a314c3c49b48df941d3022f9e311 (patch) | |
tree | f6957cb1f95a7f2a951123975b205323d6d70618 /kafka/producer/kafka.py | |
parent | 07e09c1c2ec6787fc7e4f3c2578d31b4a15d20bc (diff) | |
download | kafka-python-serialize_interface.tar.gz |
Add kafka.serializer interfacesserialize_interface
Diffstat (limited to 'kafka/producer/kafka.py')
-rw-r--r-- | kafka/producer/kafka.py | 25 |
1 files changed, 13 insertions, 12 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 17f27ab..e19121b 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -13,6 +13,7 @@ from ..client_async import KafkaClient, selectors from ..metrics import MetricConfig, Metrics from ..partitioner.default import DefaultPartitioner from ..protocol.message import Message, MessageSet +from ..serializer import Serializer from ..structs import TopicPartition from .future import FutureRecordMetadata, FutureProduceResult from .record_accumulator import AtomicInteger, RecordAccumulator @@ -485,7 +486,12 @@ class KafkaProducer(object): # available self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0) - key_bytes, value_bytes = self._serialize(topic, key, value) + key_bytes = self._serialize( + self.config['key_serializer'], + topic, key) + value_bytes = self._serialize( + self.config['value_serializer'], + topic, value) partition = self._partition(topic, partition, key, value, key_bytes, value_bytes) @@ -606,17 +612,12 @@ class KafkaProducer(object): else: log.debug("_wait_on_metadata woke after %s secs.", elapsed) - def _serialize(self, topic, key, value): - # pylint: disable-msg=not-callable - if self.config['key_serializer']: - serialized_key = self.config['key_serializer'](key) - else: - serialized_key = key - if self.config['value_serializer']: - serialized_value = self.config['value_serializer'](value) - else: - serialized_value = value - return serialized_key, serialized_value + def _serialize(self, f, topic, data): + if not f: + return data + if isinstance(f, Serializer): + return f.serialize(topic, data) + return f(data) def _partition(self, topic, partition, key, value, serialized_key, serialized_value): |