diff options
author | Dana Powers <dana.powers@rd.io> | 2014-12-11 16:35:15 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-12-15 12:43:45 -0800 |
commit | ed893c3bcfdf54f440b98b958c86fc0c13573b6f (patch) | |
tree | 10bf13e8b3efa1ffae30f057dadf4e8655a2063a /kafka/consumer/kafka.py | |
parent | 209a8f28e6df2c5be8117b3bbb1b3188f1e29adc (diff) | |
download | kafka-python-ed893c3bcfdf54f440b98b958c86fc0c13573b6f.tar.gz |
Use kafka.util.kafka_bytestring to encode utf-8 when necessary
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r-- | kafka/consumer/kafka.py | 21 |
1 files changed, 6 insertions, 15 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 069ad06..f4eb6cb 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -15,6 +15,7 @@ from kafka.common import ( OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout, FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError ) +from kafka.util import kafka_bytestring logger = logging.getLogger(__name__) @@ -225,18 +226,14 @@ class KafkaConsumer(object): # Topic name str -- all partitions if isinstance(arg, (six.string_types, six.binary_type)): - topic = arg - if isinstance(topic, six.string_types): - topic = topic.encode('utf-8') + topic = kafka_bytestring(arg) for partition in self._client.get_partition_ids_for_topic(arg): self._consume_topic_partition(topic, partition) # (topic, partition [, offset]) tuple elif isinstance(arg, tuple): - topic = arg[0] - if isinstance(topic, six.string_types): - topic = topic.encode('utf-8') + topic = kafka_bytestring(arg[0]) partition = arg[1] if len(arg) == 3: offset = arg[2] @@ -249,9 +246,7 @@ class KafkaConsumer(object): # key can be string (a topic) if isinstance(key, (six.string_types, six.binary_type)): - topic = key - if isinstance(topic, six.string_types): - topic = topic.encode('utf-8') + topic = kafka_bytestring(key) # topic: partition if isinstance(value, int): @@ -267,9 +262,7 @@ class KafkaConsumer(object): # (topic, partition): offset elif isinstance(key, tuple): - topic = key[0] - if isinstance(topic, six.string_types): - topic = topic.encode('utf-8') + topic = kafka_bytestring(key[0]) partition = key[1] self._consume_topic_partition(topic, partition) self._offsets.fetch[key] = value @@ -562,9 +555,7 @@ class KafkaConsumer(object): # def _consume_topic_partition(self, topic, partition): - if not isinstance(topic, six.binary_type): - raise KafkaConfigurationError('Unknown topic type (%s) ' - '-- expected bytes' % type(topic)) + topic = kafka_bytestring(topic) if not isinstance(partition, int): raise KafkaConfigurationError('Unknown partition type (%s) ' '-- expected int' % type(partition)) |