summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client_async.py4
-rw-r--r--kafka/coordinator/consumer.py4
-rw-r--r--kafka/producer/record_accumulator.py6
-rw-r--r--test/__init__.py11
-rw-r--r--test/fixtures.py15
-rw-r--r--test/service.py4
-rw-r--r--test/testutil.py3
7 files changed, 31 insertions, 16 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 973ece0..57aea66 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -532,6 +532,8 @@ class KafkaClient(object):
return 9999999999
node_id = self.least_loaded_node()
+ if node_id is None:
+ return 0
topics = list(self._topics)
if self.cluster.need_all_topic_metadata:
@@ -588,6 +590,8 @@ class KafkaClient(object):
"""Attempt to guess the broker version"""
if node_id is None:
node_id = self.least_loaded_node()
+ if node_id is None:
+ raise Errors.NoBrokersAvailable()
def connect(node_id):
timeout_at = time.time() + timeout
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 0e610c7..a5e3067 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -414,6 +414,8 @@ class ConsumerCoordinator(BaseCoordinator):
node_id = self.coordinator_id
else:
node_id = self._client.least_loaded_node()
+ if node_id is None:
+ return Future().failure(Errors.NoBrokersAvailable)
# create the offset commit request
offset_data = collections.defaultdict(dict)
@@ -560,6 +562,8 @@ class ConsumerCoordinator(BaseCoordinator):
node_id = self.coordinator_id
else:
node_id = self._client.least_loaded_node()
+ if node_id is None:
+ return Future().failure(Errors.NoBrokersAvailable)
# Verify node is ready
if not self._client.ready(node_id):
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 958d207..19dc199 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -248,11 +248,12 @@ class RecordAccumulator(object):
expired_batches = []
to_remove = []
count = 0
- for tp, dq in six.iteritems(self._batches):
+ for tp in list(self._batches.keys()):
assert tp in self._tp_locks, 'TopicPartition not in locks dict'
with self._tp_locks[tp]:
# iterate over the batches and expire them if they have stayed
# in accumulator for more than request_timeout_ms
+ dq = self._batches[tp]
for batch in dq:
# check if the batch is expired
if batch.maybe_expire(request_timeout_ms,
@@ -367,8 +368,9 @@ class RecordAccumulator(object):
def has_unsent(self):
"""Return whether there is any unsent record in the accumulator."""
- for tp, dq in six.iteritems(self._batches):
+ for tp in list(self._batches.keys()):
with self._tp_locks[tp]:
+ dq = self._batches[tp]
if len(dq):
return True
return False
diff --git a/test/__init__.py b/test/__init__.py
index da1069f..f91d0fa 100644
--- a/test/__init__.py
+++ b/test/__init__.py
@@ -4,3 +4,14 @@ if sys.version_info < (2, 7):
import unittest2 as unittest # pylint: disable=import-error
else:
import unittest
+
+# Set default logging handler to avoid "No handler found" warnings.
+import logging
+try: # Python 2.7+
+ from logging import NullHandler
+except ImportError:
+ class NullHandler(logging.Handler):
+ def emit(self, record):
+ pass
+
+logging.getLogger(__name__).addHandler(NullHandler())
diff --git a/test/fixtures.py b/test/fixtures.py
index 0ddcf11..0b75ffd 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -79,8 +79,10 @@ class Fixture(object):
@classmethod
def render_template(cls, source_file, target_file, binding):
+ log.info('Rendering %s from template %s', target_file, source_file)
with open(source_file, "r") as handle:
template = handle.read()
+ assert len(template) > 0, 'Empty template %s' % source_file
with open(target_file, "w") as handle:
handle.write(template.format(**binding))
handle.flush()
@@ -139,22 +141,22 @@ class ZookeeperFixture(Fixture):
env = self.kafka_run_class_env()
# Party!
- self.out("Starting...")
timeout = 5
max_timeout = 30
backoff = 1
end_at = time.time() + max_timeout
+ tries = 1
while time.time() < end_at:
- log.critical('Starting Zookeeper instance')
+ self.out('Attempting to start (try #%d)' % tries)
self.child = SpawnedService(args, env)
self.child.start()
timeout = min(timeout, max(end_at - time.time(), 0))
if self.child.wait_for(r"binding to port", timeout=timeout):
break
- log.critical('Zookeeper did not start within timeout %s secs', timeout)
self.child.stop()
timeout *= 2
time.sleep(backoff)
+ tries += 1
else:
raise Exception('Failed to start Zookeeper before max_timeout')
self.out("Done!")
@@ -260,8 +262,6 @@ class KafkaFixture(Fixture):
raise RuntimeError("Failed to create Zookeeper chroot node")
self.out("Done!")
- self.out("Starting...")
-
# Configure Kafka child process
args = self.kafka_run_class_args("kafka.Kafka", properties)
env = self.kafka_run_class_env()
@@ -270,18 +270,19 @@ class KafkaFixture(Fixture):
max_timeout = 30
backoff = 1
end_at = time.time() + max_timeout
+ tries = 1
while time.time() < end_at:
- log.critical('Starting Kafka instance')
+ self.out('Attempting to start (try #%d)' % tries)
self.child = SpawnedService(args, env)
self.child.start()
timeout = min(timeout, max(end_at - time.time(), 0))
if self.child.wait_for(r"\[Kafka Server %d\], Started" %
self.broker_id, timeout=timeout):
break
- log.critical('Kafka did not start within timeout %s secs', timeout)
self.child.stop()
timeout *= 2
time.sleep(backoff)
+ tries += 1
else:
raise Exception('Failed to start KafkaInstance before max_timeout')
self.out("Done!")
diff --git a/test/service.py b/test/service.py
index 0e47835..55cc739 100644
--- a/test/service.py
+++ b/test/service.py
@@ -99,10 +99,6 @@ class SpawnedService(threading.Thread):
while True:
elapsed = time.time() - start
if elapsed >= timeout:
- try:
- self.child.kill()
- except:
- log.exception("Received exception when killing child process")
log.error("Waiting for %r timed out after %d seconds", pattern, timeout)
return False
diff --git a/test/testutil.py b/test/testutil.py
index 323780c..1d1f6ea 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -142,6 +142,3 @@ class Timer(object):
def __exit__(self, *args):
self.end = time.time()
self.interval = self.end - self.start
-
-logging.getLogger('test.fixtures').setLevel(logging.ERROR)
-logging.getLogger('test.service').setLevel(logging.ERROR)