diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-12-19 11:27:23 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-12-19 11:27:23 -0800 |
commit | 655953fdac787c1a140cc641502983b6676b13c5 (patch) | |
tree | 7a926445d44f96dfcb2a53d220f49562a9906168 /kafka/producer/kafka.py | |
parent | f6291e655d556ed7e0eecdad456f4e28b01b8d2b (diff) | |
download | kafka-python-655953fdac787c1a140cc641502983b6676b13c5.tar.gz |
Add kafka.serializer interfaces (#912)
Diffstat (limited to 'kafka/producer/kafka.py')
-rw-r--r-- | kafka/producer/kafka.py | 25 |
1 files changed, 13 insertions, 12 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 1d943c6..785919b 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -13,6 +13,7 @@ from ..client_async import KafkaClient, selectors from ..metrics import MetricConfig, Metrics from ..partitioner.default import DefaultPartitioner from ..protocol.message import Message, MessageSet +from ..serializer import Serializer from ..structs import TopicPartition from .future import FutureRecordMetadata, FutureProduceResult from .record_accumulator import AtomicInteger, RecordAccumulator @@ -485,7 +486,12 @@ class KafkaProducer(object): # available self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0) - key_bytes, value_bytes = self._serialize(topic, key, value) + key_bytes = self._serialize( + self.config['key_serializer'], + topic, key) + value_bytes = self._serialize( + self.config['value_serializer'], + topic, value) partition = self._partition(topic, partition, key, value, key_bytes, value_bytes) @@ -606,17 +612,12 @@ class KafkaProducer(object): else: log.debug("_wait_on_metadata woke after %s secs.", elapsed) - def _serialize(self, topic, key, value): - # pylint: disable-msg=not-callable - if self.config['key_serializer']: - serialized_key = self.config['key_serializer'](key) - else: - serialized_key = key - if self.config['value_serializer']: - serialized_value = self.config['value_serializer'](value) - else: - serialized_value = value - return serialized_key, serialized_value + def _serialize(self, f, topic, data): + if not f: + return data + if isinstance(f, Serializer): + return f.serialize(topic, data) + return f(data) def _partition(self, topic, partition, key, value, serialized_key, serialized_value): |