summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-03-07 21:01:49 +0000
committerAlan Conway <aconway@apache.org>2011-03-07 21:01:49 +0000
commita53705eb2513e91efdbece8133fe052c261c52d7 (patch)
treebb51d22f2d9234a4926b452ca3bc4d464e527f0c /qpid/cpp/src/tests
parent92da7592b082b3fc9e847b80d6db8538fb29be29 (diff)
downloadqpid-python-a53705eb2513e91efdbece8133fe052c261c52d7.tar.gz
QPID-3121: Cluster management inconsistency when using persistent store.
With the store doing async completions, completion IO callbacks could be queued differently on different nodes. This led to inconsistent management changes in a cluster when a connection was modified in an IO callback. Fix was to mark IO callback processing as not cluster safe, so connections don't record management stats during an IO callback. Test changes: - enable durable tests in test_management. - add substitutions to mask known issue of inconsistent "stats changed" messages. - add transactional client to test_management. - ignore heartbeat connection close logs in cluster_test_logs.py - make brokertest.retry more accurate - fix minor bug in brokertest.log_ready. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1078947 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/brokertest.py46
-rwxr-xr-xqpid/cpp/src/tests/cluster_test_logs.py13
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py4
3 files changed, 37 insertions, 26 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 6e771bf5d6..19e97ce7aa 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -67,7 +67,7 @@ class ExceptionWrapper:
def __init__(self, obj, msg):
self.obj = obj
self.msg = msg
-
+
def __getattr__(self, name):
func = getattr(self.obj, name)
if type(func) != callable:
@@ -97,11 +97,12 @@ def retry(function, timeout=10, delay=.01):
"""Call function until it returns True or timeout expires.
Double the delay for each retry. Return True if function
returns true, False if timeout expires."""
+ deadline = time.time() + timeout
while not function():
- if delay > timeout: delay = timeout
+ remaining = deadline - time.time()
+ if remaining <= 0: return False
+ delay = min(delay, remaining)
time.sleep(delay)
- timeout -= delay
- if timeout <= 0: return False
delay *= 2
return True
@@ -191,7 +192,7 @@ class Popen(subprocess.Popen):
def unexpected(self,msg):
err = error_line(self.outfile("err")) or error_line(self.outfile("out"))
raise BadProcessStatus("%s %s%s" % (self.pname, msg, err))
-
+
def stop(self): # Clean up at end of test.
try:
if self.expect == EXPECT_UNKNOWN:
@@ -213,7 +214,7 @@ class Popen(subprocess.Popen):
self.unexpected("expected error")
finally:
self.wait() # Clean up the process.
-
+
def communicate(self, input=None):
if input:
self.stdin.write(input)
@@ -231,7 +232,7 @@ class Popen(subprocess.Popen):
def poll(self, _deadstate=None): # _deadstate required by base class in python 2.4
if self.returncode is None:
# Pass _deadstate only if it has been set, there is no _deadstate
- # parameter in Python 2.6
+ # parameter in Python 2.6
if _deadstate is None: ret = subprocess.Popen.poll(self)
else: ret = subprocess.Popen.poll(self, _deadstate)
@@ -255,7 +256,7 @@ class Popen(subprocess.Popen):
os.kill( self.pid , signal.SIGTERM)
except AttributeError: # no os.kill, using taskkill.. (Windows only)
os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
-
+
def kill(self):
try: subprocess.Popen.kill(self)
except AttributeError: # No terminate method
@@ -289,7 +290,7 @@ class Broker(Popen):
while (os.path.exists(self.log)):
self.log = "%s-%d.log" % (self.name, i)
i += 1
-
+
def get_log(self):
return os.path.abspath(self.log)
@@ -319,7 +320,7 @@ class Broker(Popen):
cmd += ["--log-to-file", self.log]
cmd += ["--log-to-stderr=no"]
if log_level != None:
- cmd += ["--log-enable=%s" % log_level]
+ cmd += ["--log-enable=%s" % log_level]
self.datadir = self.name
cmd += ["--data-dir", self.datadir]
Popen.__init__(self, cmd, expect, drain=False)
@@ -362,7 +363,7 @@ class Broker(Popen):
s = c.session(str(qpid.datatypes.uuid4()))
s.queue_declare(queue=queue)
c.close()
-
+
def _prep_sender(self, queue, durable, xprops):
s = queue + "; {create:always, node:{durable:" + str(durable)
if xprops != None: s += ", x-declare:{" + xprops + "}"
@@ -406,13 +407,14 @@ class Broker(Popen):
def log_ready(self):
"""Return true if the log file exists and contains a broker ready message"""
- if self._log_ready: return True
- self._log_ready = find_in_file("notice Broker running", self.log)
+ if not self._log_ready:
+ self._log_ready = find_in_file("notice Broker running", self.log)
+ return self._log_ready
def ready(self, **kwargs):
"""Wait till broker is ready to serve clients"""
# First make sure the broker is listening by checking the log.
- if not retry(self.log_ready, timeout=30):
+ if not retry(self.log_ready, timeout=60):
raise Exception(
"Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5)))
# Create a connection and a session. For a cluster broker this will
@@ -421,8 +423,8 @@ class Broker(Popen):
c = self.connect(**kwargs)
try: c.session()
finally: c.close()
- except: raise RethrownException(
- "Broker %s failed ready test%s"%(self.name,error_line(self.log, 5)))
+ except Exception,e: raise RethrownException(
+ "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5)))
def store_state(self):
uuids = open(os.path.join(self.datadir, "cluster", "store.status")).readlines()
@@ -431,7 +433,7 @@ class Broker(Popen):
if uuids[0] == null_uuid: return "empty"
if uuids[1] == null_uuid: return "dirty"
return "clean"
-
+
class Cluster:
"""A cluster of brokers in a test."""
@@ -486,7 +488,7 @@ class BrokerTest(TestCase):
rootdir = os.getcwd()
def configure(self, config): self.config=config
-
+
def setUp(self):
outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp"
self.dir = os.path.join(self.rootdir, outdir, self.id())
@@ -561,7 +563,7 @@ class StoppableThread(Thread):
self.stopped = True
self.join()
if self.error: raise self.error
-
+
class NumberedSender(Thread):
"""
Thread to run a sender client and send numbered messages until stopped.
@@ -620,7 +622,7 @@ class NumberedSender(Thread):
self.join()
self.write_message(-1) # end-of-messages marker.
if self.error: raise self.error
-
+
class NumberedReceiver(Thread):
"""
Thread to run a receiver client and verify it receives
@@ -647,7 +649,7 @@ class NumberedReceiver(Thread):
def read_message(self):
return int(self.receiver.stdout.readline())
-
+
def run(self):
try:
self.received = 0
@@ -679,7 +681,7 @@ class ErrorGenerator(StoppableThread):
self.broker=broker
broker.test.cleanup_stop(self)
self.start()
-
+
def run(self):
c = self.broker.connect_old()
try:
diff --git a/qpid/cpp/src/tests/cluster_test_logs.py b/qpid/cpp/src/tests/cluster_test_logs.py
index 25ddb3b74c..9f7d1e2f6c 100755
--- a/qpid/cpp/src/tests/cluster_test_logs.py
+++ b/qpid/cpp/src/tests/cluster_test_logs.py
@@ -61,10 +61,10 @@ def filter_log(log):
'warning CLOSING .* unsent data',
'Inter-broker link ',
'Running in a cluster, marking store',
- 'debug Sending keepalive signal to watchdog',
- 'last broker standing joined by 1 replicas, updating queue policies.'
+ 'debug Sending keepalive signal to watchdog', # Watchdog timer thread
+ 'last broker standing joined by 1 replicas, updating queue policies.',
+ 'Connection .* timed out: closing' # heartbeat connection close
])
- skip_re = re.compile(skip)
# Regex to match a UUID
uuid='\w\w\w\w\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w\w\w\w\w\w\w\w\w'
# Substitutions to remove expected differences
@@ -82,6 +82,13 @@ def filter_log(log):
(r' map={.*_object_name:([^,}]*)[,}].*', r' \1'), # V2 map - just keep name
(r'\d+-\d+-\d+--\d+', 'X-X-X--X'), # V1 Object IDs
]
+ # Substitutions to mask known issue: durable test shows inconsistent "changed stats for com.redhat.rhm.store:journal" messages.
+ skip += '|Changed V[12] statistics com.redhat.rhm.store:journal'
+ subs += [(r'to=console.obj.1.0.com.redhat.rhm.store.journal props=\d+ stats=\d+',
+ 'to=console.obj.1.0.com.redhat.rhm.store.journal props=NN stats=NN')]
+
+ skip_re = re.compile(skip)
+ subs = [(re.compile(pattern), subst) for pattern, subst in subs]
for l in open(log):
if skip_re.search(l): continue
for pattern,subst in subs: l = re.sub(pattern,subst,l)
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index b8407fbde8..3f19411d19 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -574,8 +574,10 @@ class LongTests(BrokerTest):
"""Start ordinary clients for a broker."""
cmds=[
["qpid-tool", "localhost:%s"%(broker.port())],
- ["qpid-perftest", "--count", 50000,
+ ["qpid-perftest", "--count=5000", "--durable=yes",
"--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()],
+ ["qpid-txtest", "--queue-base-name", "tx-%s"%str(qpid.datatypes.uuid4()),
+ "--port", broker.port()],
["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())],
["testagent", "localhost", str(broker.port())] ]
clients.append([ClientLoop(broker, cmd) for cmd in cmds])