summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer/kafka.py21
-rw-r--r--kafka/util.py12
2 files changed, 18 insertions, 15 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index 069ad06..f4eb6cb 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -15,6 +15,7 @@ from kafka.common import (
OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError
)
+from kafka.util import kafka_bytestring
logger = logging.getLogger(__name__)
@@ -225,18 +226,14 @@ class KafkaConsumer(object):
# Topic name str -- all partitions
if isinstance(arg, (six.string_types, six.binary_type)):
- topic = arg
- if isinstance(topic, six.string_types):
- topic = topic.encode('utf-8')
+ topic = kafka_bytestring(arg)
for partition in self._client.get_partition_ids_for_topic(arg):
self._consume_topic_partition(topic, partition)
# (topic, partition [, offset]) tuple
elif isinstance(arg, tuple):
- topic = arg[0]
- if isinstance(topic, six.string_types):
- topic = topic.encode('utf-8')
+ topic = kafka_bytestring(arg[0])
partition = arg[1]
if len(arg) == 3:
offset = arg[2]
@@ -249,9 +246,7 @@ class KafkaConsumer(object):
# key can be string (a topic)
if isinstance(key, (six.string_types, six.binary_type)):
- topic = key
- if isinstance(topic, six.string_types):
- topic = topic.encode('utf-8')
+ topic = kafka_bytestring(key)
# topic: partition
if isinstance(value, int):
@@ -267,9 +262,7 @@ class KafkaConsumer(object):
# (topic, partition): offset
elif isinstance(key, tuple):
- topic = key[0]
- if isinstance(topic, six.string_types):
- topic = topic.encode('utf-8')
+ topic = kafka_bytestring(key[0])
partition = key[1]
self._consume_topic_partition(topic, partition)
self._offsets.fetch[key] = value
@@ -562,9 +555,7 @@ class KafkaConsumer(object):
#
def _consume_topic_partition(self, topic, partition):
- if not isinstance(topic, six.binary_type):
- raise KafkaConfigurationError('Unknown topic type (%s) '
- '-- expected bytes' % type(topic))
+ topic = kafka_bytestring(topic)
if not isinstance(partition, int):
raise KafkaConfigurationError('Unknown partition type (%s) '
'-- expected int' % type(partition))
diff --git a/kafka/util.py b/kafka/util.py
index 1e03cf1..72ac521 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -86,6 +86,18 @@ def group_by_topic_and_partition(tuples):
return out
+def kafka_bytestring(s):
+ """
+ Takes a string or bytes instance
+ Returns bytes, encoding strings in utf-8 as necessary
+ """
+ if isinstance(s, six.binary_type):
+ return s
+ if isinstance(s, six.string_types):
+ return s.encode('utf-8')
+ raise TypeError(s)
+
+
class ReentrantTimer(object):
"""
A timer that can be restarted, unlike threading.Timer