diff options
-rw-r--r-- | kafka/client_async.py | 4 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 4 | ||||
-rw-r--r-- | kafka/producer/record_accumulator.py | 6 | ||||
-rw-r--r-- | test/__init__.py | 11 | ||||
-rw-r--r-- | test/fixtures.py | 15 | ||||
-rw-r--r-- | test/service.py | 4 | ||||
-rw-r--r-- | test/testutil.py | 3 |
7 files changed, 31 insertions, 16 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 973ece0..57aea66 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -532,6 +532,8 @@ class KafkaClient(object): return 9999999999 node_id = self.least_loaded_node() + if node_id is None: + return 0 topics = list(self._topics) if self.cluster.need_all_topic_metadata: @@ -588,6 +590,8 @@ class KafkaClient(object): """Attempt to guess the broker version""" if node_id is None: node_id = self.least_loaded_node() + if node_id is None: + raise Errors.NoBrokersAvailable() def connect(node_id): timeout_at = time.time() + timeout diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 0e610c7..a5e3067 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -414,6 +414,8 @@ class ConsumerCoordinator(BaseCoordinator): node_id = self.coordinator_id else: node_id = self._client.least_loaded_node() + if node_id is None: + return Future().failure(Errors.NoBrokersAvailable) # create the offset commit request offset_data = collections.defaultdict(dict) @@ -560,6 +562,8 @@ class ConsumerCoordinator(BaseCoordinator): node_id = self.coordinator_id else: node_id = self._client.least_loaded_node() + if node_id is None: + return Future().failure(Errors.NoBrokersAvailable) # Verify node is ready if not self._client.ready(node_id): diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 958d207..19dc199 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -248,11 +248,12 @@ class RecordAccumulator(object): expired_batches = [] to_remove = [] count = 0 - for tp, dq in six.iteritems(self._batches): + for tp in list(self._batches.keys()): assert tp in self._tp_locks, 'TopicPartition not in locks dict' with self._tp_locks[tp]: # iterate over the batches and expire them if they have stayed # in accumulator for more than request_timeout_ms + dq = self._batches[tp] for batch in dq: # check if the batch is expired if batch.maybe_expire(request_timeout_ms, @@ -367,8 +368,9 @@ class RecordAccumulator(object): def has_unsent(self): """Return whether there is any unsent record in the accumulator.""" - for tp, dq in six.iteritems(self._batches): + for tp in list(self._batches.keys()): with self._tp_locks[tp]: + dq = self._batches[tp] if len(dq): return True return False diff --git a/test/__init__.py b/test/__init__.py index da1069f..f91d0fa 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -4,3 +4,14 @@ if sys.version_info < (2, 7): import unittest2 as unittest # pylint: disable=import-error else: import unittest + +# Set default logging handler to avoid "No handler found" warnings. +import logging +try: # Python 2.7+ + from logging import NullHandler +except ImportError: + class NullHandler(logging.Handler): + def emit(self, record): + pass + +logging.getLogger(__name__).addHandler(NullHandler()) diff --git a/test/fixtures.py b/test/fixtures.py index 0ddcf11..0b75ffd 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -79,8 +79,10 @@ class Fixture(object): @classmethod def render_template(cls, source_file, target_file, binding): + log.info('Rendering %s from template %s', target_file, source_file) with open(source_file, "r") as handle: template = handle.read() + assert len(template) > 0, 'Empty template %s' % source_file with open(target_file, "w") as handle: handle.write(template.format(**binding)) handle.flush() @@ -139,22 +141,22 @@ class ZookeeperFixture(Fixture): env = self.kafka_run_class_env() # Party! - self.out("Starting...") timeout = 5 max_timeout = 30 backoff = 1 end_at = time.time() + max_timeout + tries = 1 while time.time() < end_at: - log.critical('Starting Zookeeper instance') + self.out('Attempting to start (try #%d)' % tries) self.child = SpawnedService(args, env) self.child.start() timeout = min(timeout, max(end_at - time.time(), 0)) if self.child.wait_for(r"binding to port", timeout=timeout): break - log.critical('Zookeeper did not start within timeout %s secs', timeout) self.child.stop() timeout *= 2 time.sleep(backoff) + tries += 1 else: raise Exception('Failed to start Zookeeper before max_timeout') self.out("Done!") @@ -260,8 +262,6 @@ class KafkaFixture(Fixture): raise RuntimeError("Failed to create Zookeeper chroot node") self.out("Done!") - self.out("Starting...") - # Configure Kafka child process args = self.kafka_run_class_args("kafka.Kafka", properties) env = self.kafka_run_class_env() @@ -270,18 +270,19 @@ class KafkaFixture(Fixture): max_timeout = 30 backoff = 1 end_at = time.time() + max_timeout + tries = 1 while time.time() < end_at: - log.critical('Starting Kafka instance') + self.out('Attempting to start (try #%d)' % tries) self.child = SpawnedService(args, env) self.child.start() timeout = min(timeout, max(end_at - time.time(), 0)) if self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id, timeout=timeout): break - log.critical('Kafka did not start within timeout %s secs', timeout) self.child.stop() timeout *= 2 time.sleep(backoff) + tries += 1 else: raise Exception('Failed to start KafkaInstance before max_timeout') self.out("Done!") diff --git a/test/service.py b/test/service.py index 0e47835..55cc739 100644 --- a/test/service.py +++ b/test/service.py @@ -99,10 +99,6 @@ class SpawnedService(threading.Thread): while True: elapsed = time.time() - start if elapsed >= timeout: - try: - self.child.kill() - except: - log.exception("Received exception when killing child process") log.error("Waiting for %r timed out after %d seconds", pattern, timeout) return False diff --git a/test/testutil.py b/test/testutil.py index 323780c..1d1f6ea 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -142,6 +142,3 @@ class Timer(object): def __exit__(self, *args): self.end = time.time() self.interval = self.end - self.start - -logging.getLogger('test.fixtures').setLevel(logging.ERROR) -logging.getLogger('test.service').setLevel(logging.ERROR) |