summaryrefslogtreecommitdiff
path: root/kafka/consumer/kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r--kafka/consumer/kafka.py21
1 files changed, 6 insertions, 15 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index 069ad06..f4eb6cb 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -15,6 +15,7 @@ from kafka.common import (
OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError
)
+from kafka.util import kafka_bytestring
logger = logging.getLogger(__name__)
@@ -225,18 +226,14 @@ class KafkaConsumer(object):
# Topic name str -- all partitions
if isinstance(arg, (six.string_types, six.binary_type)):
- topic = arg
- if isinstance(topic, six.string_types):
- topic = topic.encode('utf-8')
+ topic = kafka_bytestring(arg)
for partition in self._client.get_partition_ids_for_topic(arg):
self._consume_topic_partition(topic, partition)
# (topic, partition [, offset]) tuple
elif isinstance(arg, tuple):
- topic = arg[0]
- if isinstance(topic, six.string_types):
- topic = topic.encode('utf-8')
+ topic = kafka_bytestring(arg[0])
partition = arg[1]
if len(arg) == 3:
offset = arg[2]
@@ -249,9 +246,7 @@ class KafkaConsumer(object):
# key can be string (a topic)
if isinstance(key, (six.string_types, six.binary_type)):
- topic = key
- if isinstance(topic, six.string_types):
- topic = topic.encode('utf-8')
+ topic = kafka_bytestring(key)
# topic: partition
if isinstance(value, int):
@@ -267,9 +262,7 @@ class KafkaConsumer(object):
# (topic, partition): offset
elif isinstance(key, tuple):
- topic = key[0]
- if isinstance(topic, six.string_types):
- topic = topic.encode('utf-8')
+ topic = kafka_bytestring(key[0])
partition = key[1]
self._consume_topic_partition(topic, partition)
self._offsets.fetch[key] = value
@@ -562,9 +555,7 @@ class KafkaConsumer(object):
#
def _consume_topic_partition(self, topic, partition):
- if not isinstance(topic, six.binary_type):
- raise KafkaConfigurationError('Unknown topic type (%s) '
- '-- expected bytes' % type(topic))
+ topic = kafka_bytestring(topic)
if not isinstance(partition, int):
raise KafkaConfigurationError('Unknown partition type (%s) '
'-- expected int' % type(partition))