diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-16 16:00:14 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-12-15 12:42:56 -0800 |
commit | 6e6ae533272ed32d150221534a16d588e42f9c51 (patch) | |
tree | d966bd227af98c30da9fd616bea4df4836146d99 | |
parent | 391b53201f200ab246b78e76c6e7945c8af6846e (diff) | |
download | kafka-python-6e6ae533272ed32d150221534a16d588e42f9c51.tar.gz |
Use six for py3 support in KafkaConsumer
Log connection failures w/ traceback in kafka/client.py
-rw-r--r-- | kafka/client.py | 6 | ||||
-rw-r--r-- | kafka/consumer/new.py | 52 |
2 files changed, 42 insertions, 16 deletions
diff --git a/kafka/client.py b/kafka/client.py index 8c78694..bc3d853 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -119,9 +119,9 @@ class KafkaClient(object): response = conn.recv(requestId) return decoder_fn(response) - except Exception as e: - log.warning("Could not send request [%r] to server %s:%i, " - "trying next server: %s" % (requestId, host, port, e)) + except Exception: + log.exception("Could not send request [%r] to server %s:%i, " + "trying next server" % (requestId, host, port)) raise KafkaUnavailableError("All servers failed to process request") diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py index e0884d3..90770b5 100644 --- a/kafka/consumer/new.py +++ b/kafka/consumer/new.py @@ -43,6 +43,8 @@ DEFAULT_CONSUMER_CONFIG = { 'rebalance_backoff_ms': 2000, } +BYTES_CONFIGURATION_KEYS = ('client_id', 'group_id') + class KafkaConsumer(object): """ @@ -153,6 +155,14 @@ class KafkaConsumer(object): raise KafkaConfigurationError('Unknown configuration key(s): ' + str(list(configs.keys()))) + # Handle str/bytes conversions + for config_key in BYTES_CONFIGURATION_KEYS: + if not (self._config[config_key] is None or + isinstance(self._config[config_key], six.binary_type)): + logger.warning("Converting configuration key '%s' to bytes" % + config_key) + self._config[config_key] = self._config[config_key].encode('utf-8') + if self._config['auto_commit_enable']: if not self._config['group_id']: raise KafkaConfigurationError('KafkaConsumer configured to auto-commit without required consumer group (group_id)') @@ -176,7 +186,7 @@ class KafkaConsumer(object): Optionally specify offsets to start from Accepts types: - str: topic name (will consume all available partitions) + str (utf-8): topic name (will consume all available partitions) tuple: (topic, partition) dict: { topic: partition } { topic: [partition list] } @@ -212,14 +222,20 @@ class KafkaConsumer(object): for arg in topics: # Topic name str -- all partitions - if isinstance(arg, six.string_types): + if isinstance(arg, (six.string_types, six.binary_type)): topic = arg + if isinstance(topic, six.string_types): + topic = topic.encode('utf-8') + for partition in self._client.get_partition_ids_for_topic(arg): self._consume_topic_partition(topic, partition) # (topic, partition [, offset]) tuple elif isinstance(arg, tuple): - (topic, partition) = arg[0:2] + topic = arg[0] + if isinstance(topic, six.string_types): + topic = topic.encode('utf-8') + partition = arg[1] if len(arg) == 3: offset = arg[2] self._offsets.fetch[(topic, partition)] = offset @@ -227,26 +243,33 @@ class KafkaConsumer(object): # { topic: partitions, ... } dict elif isinstance(arg, dict): - for key, value in arg.iteritems(): + for key, value in six.iteritems(arg): # key can be string (a topic) - if isinstance(key, six.string_types): + if isinstance(key, (six.string_types, six.binary_type)): + topic = key + if isinstance(topic, six.string_types): + topic = topic.encode('utf-8') # topic: partition if isinstance(value, int): - self._consume_topic_partition(key, value) + self._consume_topic_partition(topic, value) # topic: [ partition1, partition2, ... ] elif isinstance(value, (list, tuple)): for partition in value: - self._consume_topic_partition(key, partition) + self._consume_topic_partition(topic, partition) else: raise KafkaConfigurationError('Unknown topic type (dict key must be ' 'int or list/tuple of ints)') # (topic, partition): offset elif isinstance(key, tuple): - self._consume_topic_partition(*key) + topic = key[0] + if isinstance(topic, six.string_types): + topic = topic.encode('utf-8') + partition = key[1] + self._consume_topic_partition(topic, partition) self._offsets.fetch[key] = value else: @@ -300,7 +323,7 @@ class KafkaConsumer(object): while True: try: - return self._get_message_iterator().next() + return six.next(self._get_message_iterator()) # Handle batch completion except StopIteration: @@ -337,7 +360,7 @@ class KafkaConsumer(object): raise KafkaConfigurationError('No fetch offsets found when calling fetch_messages') fetches = [] - for topic_partition, offset in offsets.iteritems(): + for topic_partition, offset in six.iteritems(offsets): fetches.append(FetchRequest(topic_partition[0], topic_partition[1], offset, max_bytes)) # client.send_fetch_request will collect topic/partition requests by leader @@ -494,7 +517,7 @@ class KafkaConsumer(object): offsets = self._offsets.task_done commits = [] - for topic_partition, task_done_offset in offsets.iteritems(): + for topic_partition, task_done_offset in six.iteritems(offsets): # Skip if None if task_done_offset is None: @@ -538,9 +561,9 @@ class KafkaConsumer(object): # def _consume_topic_partition(self, topic, partition): - if not isinstance(topic, six.string_types): + if not isinstance(topic, six.binary_type): raise KafkaConfigurationError('Unknown topic type (%s) ' - '-- expected string' % type(topic)) + '-- expected bytes' % type(topic)) if not isinstance(partition, int): raise KafkaConfigurationError('Unknown partition type (%s) ' '-- expected int' % type(partition)) @@ -677,6 +700,9 @@ class KafkaConsumer(object): def __iter__(self): return self + def __next__(self): + return self.next() + def _get_message_iterator(self): # Fetch a new batch if needed if self._msg_iter is None: |