summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client.py2
-rw-r--r--kafka/common.py10
-rw-r--r--kafka/consumer.py4
-rw-r--r--kafka/util.py4
-rw-r--r--test/fixtures.py4
-rw-r--r--test/service.py78
-rw-r--r--test/test_consumer_integration.py2
7 files changed, 38 insertions, 66 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 4870ab9..d0e07d0 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -6,8 +6,6 @@ import kafka.common
from functools import partial
from itertools import count
-from kafka.common import *
-
from kafka.common import (TopicAndPartition,
ConnectionError, FailedPayloadsError,
PartitionUnavailableError,
diff --git a/kafka/common.py b/kafka/common.py
index d288b89..d515532 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -121,11 +121,16 @@ class StaleControllerEpochError(BrokerResponseError):
message = 'STALE_CONTROLLER_EPOCH'
-class OffsetMetadataTooLarge(BrokerResponseError):
+class OffsetMetadataTooLargeError(BrokerResponseError):
errno = 12
message = 'OFFSET_METADATA_TOO_LARGE'
+class StaleLeaderEpochCodeError(BrokerResponseError):
+ errno = 13
+ message = 'STALE_LEADER_EPOCH_CODE'
+
+
class KafkaUnavailableError(KafkaError):
pass
@@ -178,7 +183,8 @@ kafka_errors = {
9 : ReplicaNotAvailableError,
10 : MessageSizeTooLargeError,
11 : StaleControllerEpochError,
- 12 : OffsetMetadataTooLarge,
+ 12 : OffsetMetadataTooLargeError,
+ 13 : StaleLeaderEpochCodeError,
}
def check_error(response):
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 085f5e8..ef8fbda 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -430,12 +430,12 @@ class SimpleConsumer(Consumer):
# Put the message in our queue
self.queue.put((partition, message))
self.fetch_offsets[partition] = message.offset + 1
- except ConsumerFetchSizeTooSmall as e:
+ except ConsumerFetchSizeTooSmall:
if (self.max_buffer_size is not None and
self.buffer_size == self.max_buffer_size):
log.error("Max fetch size %d too small",
self.max_buffer_size)
- raise e
+ raise
if self.max_buffer_size is None:
self.buffer_size *= 2
else:
diff --git a/kafka/util.py b/kafka/util.py
index 0577a88..a918234 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -1,6 +1,6 @@
-import sys
-import struct
import collections
+import struct
+import sys
from threading import Thread, Event
from kafka.common import BufferUnderflowError
diff --git a/test/fixtures.py b/test/fixtures.py
index df6faec..df8cd42 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -98,8 +98,6 @@ class ZookeeperFixture(Fixture):
"org.apache.zookeeper.server.quorum.QuorumPeerMain",
properties
))
- self.child.configure_stdout(os.path.join(self.tmp_dir, "stdout.txt"))
- self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt"))
# Party!
self.out("Starting...")
@@ -180,8 +178,6 @@ class KafkaFixture(Fixture):
self.child = SpawnedService(self.kafka_run_class_args(
"kafka.Kafka", properties
))
- self.child.configure_stdout(os.path.join(self.tmp_dir, "stdout.txt"))
- self.child.configure_stderr(os.path.join(self.tmp_dir, "stderr.txt"))
# Party!
self.out("Creating Zookeeper chroot node...")
diff --git a/test/service.py b/test/service.py
index 78a5f24..8872c82 100644
--- a/test/service.py
+++ b/test/service.py
@@ -1,3 +1,4 @@
+import logging
import re
import select
import subprocess
@@ -29,43 +30,15 @@ class SpawnedService(threading.Thread):
threading.Thread.__init__(self)
self.args = args
- self.captured_stdout = ""
- self.captured_stderr = ""
- self.stdout_file = None
- self.stderr_file = None
- self.capture_stdout = True
- self.capture_stderr = True
- self.show_stdout = True
- self.show_stderr = True
+ self.captured_stdout = []
+ self.captured_stderr = []
self.should_die = threading.Event()
- def configure_stdout(self, file=None, capture=True, show=False):
- self.stdout_file = file
- self.capture_stdout = capture
- self.show_stdout = show
-
- def configure_stderr(self, file=None, capture=False, show=False):
- self.stderr_file = file
- self.capture_stderr = capture
- self.show_stderr = show
-
def run(self):
- stdout_handle = None
- stderr_handle = None
- try:
- if self.stdout_file:
- stdout_handle = open(self.stdout_file, "w")
- if self.stderr_file:
- stderr_handle = open(self.stderr_file, "w")
- self.run_with_handles(stdout_handle, stderr_handle)
- finally:
- if stdout_handle:
- stdout_handle.close()
- if stderr_handle:
- stderr_handle.close()
-
- def run_with_handles(self, stdout_handle, stderr_handle):
+ self.run_with_handles()
+
+ def run_with_handles(self):
self.child = subprocess.Popen(
self.args,
bufsize=1,
@@ -78,35 +51,32 @@ class SpawnedService(threading.Thread):
if self.child.stdout in rds:
line = self.child.stdout.readline()
- if stdout_handle:
- stdout_handle.write(line)
- stdout_handle.flush()
- if self.capture_stdout:
- self.captured_stdout += line
- if self.show_stdout:
- sys.stdout.write(line)
- sys.stdout.flush()
+ self.captured_stdout.append(line)
if self.child.stderr in rds:
line = self.child.stderr.readline()
- if stderr_handle:
- stderr_handle.write(line)
- stderr_handle.flush()
- if self.capture_stderr:
- self.captured_stderr += line
- if self.show_stderr:
- sys.stderr.write(line)
- sys.stderr.flush()
+ self.captured_stderr.append(line)
if self.should_die.is_set():
self.child.terminate()
alive = False
- if self.child.poll() is not None:
+ poll_results = self.child.poll()
+ if poll_results is not None:
if not alive:
break
else:
- raise RuntimeError("Subprocess has died. Aborting.")
+ self.dump_logs()
+ raise RuntimeError("Subprocess has died. Aborting. (args=%s)" % ' '.join(str(x) for x in self.args))
+
+ def dump_logs(self):
+ logging.critical('stderr')
+ for line in self.captured_stderr:
+ logging.critical(line.rstrip())
+
+ logging.critical('stdout')
+ for line in self.captured_stdout:
+ logging.critical(line.rstrip())
def wait_for(self, pattern, timeout=10):
t1 = time.time()
@@ -117,11 +87,13 @@ class SpawnedService(threading.Thread):
self.child.kill()
except:
logging.exception("Received exception when killing child process")
+ self.dump_logs()
+
raise RuntimeError("Waiting for %r timed out" % pattern)
- if re.search(pattern, self.captured_stdout, re.IGNORECASE) is not None:
+ if re.search(pattern, '\n'.join(self.captured_stdout), re.IGNORECASE) is not None:
return
- if re.search(pattern, self.captured_stderr, re.IGNORECASE) is not None:
+ if re.search(pattern, '\n'.join(self.captured_stderr), re.IGNORECASE) is not None:
return
time.sleep(0.1)
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index 9300021..da2faf6 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -20,7 +20,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
cls.server = cls.server1 # Bootstrapping server
@classmethod
- def tearDownClass(cls): # noqa
+ def tearDownClass(cls):
if not os.environ.get('KAFKA_VERSION'):
return