summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-16 16:00:14 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:42:56 -0800
commit6e6ae533272ed32d150221534a16d588e42f9c51 (patch)
treed966bd227af98c30da9fd616bea4df4836146d99
parent391b53201f200ab246b78e76c6e7945c8af6846e (diff)
downloadkafka-python-6e6ae533272ed32d150221534a16d588e42f9c51.tar.gz
Use six for py3 support in KafkaConsumer
Log connection failures w/ traceback in kafka/client.py
-rw-r--r--kafka/client.py6
-rw-r--r--kafka/consumer/new.py52
2 files changed, 42 insertions, 16 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 8c78694..bc3d853 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -119,9 +119,9 @@ class KafkaClient(object):
response = conn.recv(requestId)
return decoder_fn(response)
- except Exception as e:
- log.warning("Could not send request [%r] to server %s:%i, "
- "trying next server: %s" % (requestId, host, port, e))
+ except Exception:
+ log.exception("Could not send request [%r] to server %s:%i, "
+ "trying next server" % (requestId, host, port))
raise KafkaUnavailableError("All servers failed to process request")
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py
index e0884d3..90770b5 100644
--- a/kafka/consumer/new.py
+++ b/kafka/consumer/new.py
@@ -43,6 +43,8 @@ DEFAULT_CONSUMER_CONFIG = {
'rebalance_backoff_ms': 2000,
}
+BYTES_CONFIGURATION_KEYS = ('client_id', 'group_id')
+
class KafkaConsumer(object):
"""
@@ -153,6 +155,14 @@ class KafkaConsumer(object):
raise KafkaConfigurationError('Unknown configuration key(s): ' +
str(list(configs.keys())))
+ # Handle str/bytes conversions
+ for config_key in BYTES_CONFIGURATION_KEYS:
+ if not (self._config[config_key] is None or
+ isinstance(self._config[config_key], six.binary_type)):
+ logger.warning("Converting configuration key '%s' to bytes" %
+ config_key)
+ self._config[config_key] = self._config[config_key].encode('utf-8')
+
if self._config['auto_commit_enable']:
if not self._config['group_id']:
raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)')
@@ -176,7 +186,7 @@ class KafkaConsumer(object):
Optionally specify offsets to start from
Accepts types:
- str: topic name (will consume all available partitions)
+ str (utf-8): topic name (will consume all available partitions)
tuple: (topic, partition)
dict: { topic: partition }
{ topic: [partition list] }
@@ -212,14 +222,20 @@ class KafkaConsumer(object):
for arg in topics:
# Topic name str -- all partitions
- if isinstance(arg, six.string_types):
+ if isinstance(arg, (six.string_types, six.binary_type)):
topic = arg
+ if isinstance(topic, six.string_types):
+ topic = topic.encode('utf-8')
+
for partition in self._client.get_partition_ids_for_topic(arg):
self._consume_topic_partition(topic, partition)
# (topic, partition [, offset]) tuple
elif isinstance(arg, tuple):
- (topic, partition) = arg[0:2]
+ topic = arg[0]
+ if isinstance(topic, six.string_types):
+ topic = topic.encode('utf-8')
+ partition = arg[1]
if len(arg) == 3:
offset = arg[2]
self._offsets.fetch[(topic, partition)] = offset
@@ -227,26 +243,33 @@ class KafkaConsumer(object):
# { topic: partitions, ... } dict
elif isinstance(arg, dict):
- for key, value in arg.iteritems():
+ for key, value in six.iteritems(arg):
# key can be string (a topic)
- if isinstance(key, six.string_types):
+ if isinstance(key, (six.string_types, six.binary_type)):
+ topic = key
+ if isinstance(topic, six.string_types):
+ topic = topic.encode('utf-8')
# topic: partition
if isinstance(value, int):
- self._consume_topic_partition(key, value)
+ self._consume_topic_partition(topic, value)
# topic: [ partition1, partition2, ... ]
elif isinstance(value, (list, tuple)):
for partition in value:
- self._consume_topic_partition(key, partition)
+ self._consume_topic_partition(topic, partition)
else:
raise KafkaConfigurationError('Unknown topic type (dict key must be '
'int or list/tuple of ints)')
# (topic, partition): offset
elif isinstance(key, tuple):
- self._consume_topic_partition(*key)
+ topic = key[0]
+ if isinstance(topic, six.string_types):
+ topic = topic.encode('utf-8')
+ partition = key[1]
+ self._consume_topic_partition(topic, partition)
self._offsets.fetch[key] = value
else:
@@ -300,7 +323,7 @@ class KafkaConsumer(object):
while True:
try:
- return self._get_message_iterator().next()
+ return six.next(self._get_message_iterator())
# Handle batch completion
except StopIteration:
@@ -337,7 +360,7 @@ class KafkaConsumer(object):
raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages')
fetches = []
- for topic_partition, offset in offsets.iteritems():
+ for topic_partition, offset in six.iteritems(offsets):
fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes))
# client.send_fetch_request will collect topic/partition requests by leader
@@ -494,7 +517,7 @@ class KafkaConsumer(object):
offsets = self._offsets.task_done
commits = []
- for topic_partition, task_done_offset in offsets.iteritems():
+ for topic_partition, task_done_offset in six.iteritems(offsets):
# Skip if None
if task_done_offset is None:
@@ -538,9 +561,9 @@ class KafkaConsumer(object):
#
def _consume_topic_partition(self, topic, partition):
- if not isinstance(topic, six.string_types):
+ if not isinstance(topic, six.binary_type):
raise KafkaConfigurationError('Unknown topic type (%s) '
- '-- expected string' % type(topic))
+ '-- expected bytes' % type(topic))
if not isinstance(partition, int):
raise KafkaConfigurationError('Unknown partition type (%s) '
'-- expected int' % type(partition))
@@ -677,6 +700,9 @@ class KafkaConsumer(object):
def __iter__(self):
return self
+ def __next__(self):
+ return self.next()
+
def _get_message_iterator(self):
# Fetch a new batch if needed
if self._msg_iter is None: