summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-11-25 18:36:09 +0000
committerAlan Conway <aconway@apache.org>2009-11-25 18:36:09 +0000
commit8b804ca1645b09885ff2f3eb9a8540c842db92a2 (patch)
tree5bb06546d7a6ac7ac2a2a21d5649a774490eea79 /qpid
parent5f94901345298cea65cb2ab0b49b9ad721fc9cb3 (diff)
downloadqpid-python-8b804ca1645b09885ff2f3eb9a8540c842db92a2.tar.gz
Consistency checks for persistent cluster startup.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@884226 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp33
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h2
-rw-r--r--qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp40
-rw-r--r--qpid/cpp/src/qpid/cluster/InitialStatusMap.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/StoreStatus.cpp2
-rw-r--r--qpid/cpp/src/qpid/cluster/StoreStatus.h1
-rwxr-xr-xqpid/cpp/src/tests/cluster_tests.py49
-rw-r--r--qpid/cpp/xml/cluster.xml6
-rw-r--r--qpid/python/qpid/brokertest.py26
9 files changed, 106 insertions, 56 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 282b639f61..fa53fc5475 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -175,7 +175,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 835547;
+const uint32_t Cluster::CLUSTER_VERSION = 884125;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -202,7 +202,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
cluster.errorCheck(member, type, frameSeq, l);
}
- void shutdown() { cluster.shutdown(member, l); }
+ void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
@@ -287,7 +287,7 @@ void Cluster::initialize() {
default:
assert(0);
}
- QPID_LOG(notice, *this << (state == READY ? "joined" : "joining") << " cluster " << name << " with url=" << myUrl);
+ QPID_LOG(notice, *this << (state == READY ? " joined" : " joining") << " cluster " << name);
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
@@ -601,6 +601,7 @@ void Cluster::initMapCompleted(Lock& l) {
// Called on completion of the initial status map.
if (state == INIT) {
// We have status for all members so we can make join descisions.
+ initMap.checkConsistent();
elders = initMap.getElders();
QPID_LOG(debug, *this << " elders: " << elders);
if (!elders.empty()) { // I'm not the elder, I don't handle links & replication.
@@ -611,17 +612,8 @@ void Cluster::initMapCompleted(Lock& l) {
else {
QPID_LOG(info, this << " active for links.");
}
- // Check that cluster ID matches persistent store.
- Uuid agreedId = initMap.getClusterId();
- if (store.hasStore()) {
- Uuid storeId = store.getClusterId();
- if (storeId && storeId != agreedId)
- throw Exception(
- QPID_MSG("Persistent cluster-id " << storeId
- << " doesn't match cluster " << agreedId));
- store.dirty(agreedId);
- }
- setClusterId(agreedId, l);
+ setClusterId(initMap.getClusterId(), l);
+ if (store.hasStore()) store.dirty(clusterId);
if (initMap.isUpdateNeeded()) { // Joining established cluster.
broker.setRecovery(false); // Ditch my current store.
@@ -822,13 +814,13 @@ void Cluster::checkUpdateIn(Lock&) {
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
discarding = false; // ok to set, we're stalled for update.
- QPID_LOG(notice, *this << " update complete, starting catch-up, members: " << map);
+ QPID_LOG(notice, *this << " update complete, starting catch-up.");
deliverEventQueue.start();
}
else if (updateRetracted) { // Update was retracted, request another update
updateRetracted = false;
state = JOINER;
- QPID_LOG(notice, *this << " update retracted, sending new update request");
+ QPID_LOG(notice, *this << " update retracted, sending new update request.");
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
deliverEventQueue.start();
}
@@ -853,10 +845,9 @@ void Cluster::updateOutError(const std::exception& e) {
updateOutDone(l);
}
-void Cluster ::shutdown(const MemberId& , Lock& l) {
+void Cluster ::shutdown(const MemberId& , const Uuid& id, Lock& l) {
QPID_LOG(notice, *this << " cluster shut down by administrator.");
- // FIXME aconway 2009-11-20: incorrect! Need to pass UUID on shutdown command.
- if (store.hasStore()) store.clean(Uuid(true));
+ if (store.hasStore()) store.clean(Uuid(id));
leave(l);
}
@@ -885,13 +876,13 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s
}
void Cluster::stopClusterNode(Lock& l) {
- QPID_LOG(notice, *this << " stopped by admin");
+ QPID_LOG(notice, *this << " cluster member stopped by administrator.");
leave(l);
}
void Cluster::stopFullCluster(Lock& ) {
QPID_LOG(notice, *this << " shutting down cluster " << name);
- mcast.mcastControl(ClusterShutdownBody(), self);
+ mcast.mcastControl(ClusterShutdownBody(ProtocolVersion(), Uuid(true)), self);
}
void Cluster::memberUpdate(Lock& l) {
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 0f931bbe29..7872588307 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -160,7 +160,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void messageExpired(const MemberId&, uint64_t, Lock& l);
void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
- void shutdown(const MemberId&, Lock&);
+ void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&);
// Helper functions
ConnectionPtr getConnection(const EventFrame&, Lock&);
diff --git a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp b/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
index 51d6140008..a5618db3e6 100644
--- a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
+++ b/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp
@@ -29,6 +29,7 @@ namespace cluster {
using namespace std;
using namespace boost;
using namespace framing::cluster;
+using namespace framing;
InitialStatusMap::InitialStatusMap(const MemberId& self_, size_t size_)
: self(self_), completed(), resendNeeded(), size(size_)
@@ -106,7 +107,6 @@ bool InitialStatusMap::hasStore(const Map::value_type& v) {
}
bool InitialStatusMap::isUpdateNeeded() {
- // FIXME aconway 2009-11-20: consistency checks isComplete or here?
assert(isComplete());
// We need an update if there are any active members.
if (find_if(map.begin(), map.end(), &isActive) != map.end()) return true;
@@ -145,7 +145,43 @@ framing::Uuid InitialStatusMap::getClusterId() {
if (i != map.end())
return i->second->getClusterId(); // An active member
else
- return map.begin()->second->getClusterId();
+ return map.begin()->second->getClusterId(); // Youngest newcomer in node-id order
}
+void InitialStatusMap::checkConsistent() {
+ assert(isComplete());
+ bool persistent = (map.begin()->second->getStoreState() != STORE_STATE_NO_STORE);
+ Uuid clusterId;
+ for (Map::iterator i = map.begin(); i != map.end(); ++i) {
+ // Must not mix transient and persistent members.
+ if (persistent != (i->second->getStoreState() != STORE_STATE_NO_STORE))
+ throw Exception("Mixing transient and persistent brokers in a cluster");
+ // Members with non-empty stores must have same cluster-id
+ switch (i->second->getStoreState()) {
+ case STORE_STATE_NO_STORE:
+ case STORE_STATE_EMPTY_STORE:
+ break;
+ case STORE_STATE_DIRTY_STORE:
+ case STORE_STATE_CLEAN_STORE:
+ if (!clusterId) clusterId = i->second->getClusterId();
+ assert(clusterId);
+ if (clusterId != i->second->getClusterId())
+ throw Exception("Cluster-id mismatch, brokers belonged to different clusters.");
+ }
+ }
+ // If this is a newly forming cluster, clean stores must have same shutdown-id
+ if (find_if(map.begin(), map.end(), &isActive) == map.end()) {
+ Uuid shutdownId;
+ for (Map::iterator i = map.begin(); i != map.end(); ++i) {
+ if (i->second->getStoreState() == STORE_STATE_CLEAN_STORE) {
+ if (!shutdownId) shutdownId = i->second->getShutdownId();
+ assert(shutdownId);
+ if (shutdownId != i->second->getShutdownId())
+ throw Exception("Shutdown-id mismatch, brokers were not shut down together.");
+ }
+ }
+ }
+}
+
+
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h b/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
index 72963ea2bb..40fd9ee49d 100644
--- a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
+++ b/qpid/cpp/src/qpid/cluster/InitialStatusMap.h
@@ -56,13 +56,14 @@ class InitialStatusMap
bool isUpdateNeeded();
/**@pre isComplete(). @return Cluster-wide cluster ID. */
framing::Uuid getClusterId();
+ /**@pre isComplete(). @throw Exception if there are any inconsistencies. */
+ void checkConsistent();
private:
typedef std::map<MemberId, boost::optional<Status> > Map;
static bool notInitialized(const Map::value_type&);
static bool isActive(const Map::value_type&);
static bool hasStore(const Map::value_type&);
- void check();
Map map;
MemberSet firstConfig;
MemberId self;
diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp b/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
index 3602ec9188..a7da3baa50 100644
--- a/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
+++ b/qpid/cpp/src/qpid/cluster/StoreStatus.cpp
@@ -85,8 +85,6 @@ void StoreStatus::dirty(const Uuid& clusterId_) {
}
void StoreStatus::clean(const Uuid& shutdownId_) {
- assert(clusterId); // FIXME aconway 2009-11-20: throw exception
- assert(shutdownId_);
state = STORE_STATE_CLEAN_STORE;
shutdownId = shutdownId_;
save();
diff --git a/qpid/cpp/src/qpid/cluster/StoreStatus.h b/qpid/cpp/src/qpid/cluster/StoreStatus.h
index ead30b8fb8..539f46c10b 100644
--- a/qpid/cpp/src/qpid/cluster/StoreStatus.h
+++ b/qpid/cpp/src/qpid/cluster/StoreStatus.h
@@ -50,7 +50,6 @@ class StoreStatus
void save();
bool hasStore() { return state != framing::cluster::STORE_STATE_NO_STORE; }
- bool isEmpty() { return state != framing::cluster::STORE_STATE_EMPTY_STORE; }
private:
framing::cluster::StoreState state;
diff --git a/qpid/cpp/src/tests/cluster_tests.py b/qpid/cpp/src/tests/cluster_tests.py
index 1437c9e20a..78967196a9 100755
--- a/qpid/cpp/src/tests/cluster_tests.py
+++ b/qpid/cpp/src/tests/cluster_tests.py
@@ -122,9 +122,9 @@ class StoreTests(BrokerTest):
def test_persistent_restart(self):
"""Verify persistent cluster shutdown/restart scenarios"""
cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
- a = cluster.start("a", expect=EXPECT_EXIT_OK, wait_for_start=False)
- b = cluster.start("b", expect=EXPECT_EXIT_OK, wait_for_start=False)
- c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait_for_start=True)
+ a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+ c = cluster.start("c", expect=EXPECT_EXIT_FAIL, wait=True)
a.send_message("q", Message("1", durable=True))
# Kill & restart one member.
c.kill()
@@ -135,30 +135,30 @@ class StoreTests(BrokerTest):
# Shut down the entire cluster cleanly and bring it back up
a.send_message("q", Message("3", durable=True))
qpid_cluster.main(["qpid-cluster", "-kf", a.host_port()])
- a = cluster.start("a", wait_for_start=False)
- b = cluster.start("b", wait_for_start=False)
- c = cluster.start("c", wait_for_start=True)
+ a = cluster.start("a", wait=False)
+ b = cluster.start("b", wait=False)
+ c = cluster.start("c", wait=True)
self.assertEqual(a.get_message("q").content, "3")
def test_persistent_partial_failure(self):
# Kill 2 members, shut down the last cleanly then restart
# Ensure we use the clean database
cluster = self.cluster(0, args=self.args() + ["--cluster-size=3"])
- a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait_for_start=False)
- b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait_for_start=False)
- c = cluster.start("c", expect=EXPECT_EXIT_OK, wait_for_start=True)
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
+ c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=True)
a.send_message("q", Message("4", durable=True))
a.kill()
b.kill()
self.assertEqual(c.get_message("q").content, "4")
c.send_message("q", Message("clean", durable=True))
qpid_cluster.main(["qpid-cluster", "-kf", c.host_port()])
- a = cluster.start("a", wait_for_start=False)
- b = cluster.start("b", wait_for_start=False)
- c = cluster.start("c", wait_for_start=True)
+ a = cluster.start("a", wait=False)
+ b = cluster.start("b", wait=False)
+ c = cluster.start("c", wait=True)
self.assertEqual(a.get_message("q").content, "clean")
- def test_wrong_store_uuid(self):
+ def test_wrong_cluster_id(self):
# Start a cluster1 broker, then try to restart in cluster2
cluster1 = self.cluster(0, args=self.args())
a = cluster1.start("a", expect=EXPECT_EXIT_OK)
@@ -168,4 +168,25 @@ class StoreTests(BrokerTest):
a = cluster2.start("a", expect=EXPECT_EXIT_FAIL)
self.fail("Expected exception")
except: pass
-
+
+ def test_wrong_shutdown_id(self):
+ # Start 2 members and shut down.
+ cluster = self.cluster(0, args=self.args()+["--cluster-size=2"])
+ a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_OK, wait=False)
+ self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()]))
+ self.assertEqual(a.wait(), 0)
+ self.assertEqual(b.wait(), 0)
+
+ # Restart with a different member and shut down.
+ a = cluster.start("a", expect=EXPECT_EXIT_OK, wait=False)
+ c = cluster.start("c", expect=EXPECT_EXIT_OK, wait=False)
+ self.assertEqual(0, qpid_cluster.main(["qpid_cluster", "-kf", a.host_port()]))
+ self.assertEqual(a.wait(), 0)
+ self.assertEqual(c.wait(), 0)
+
+ # Mix members from both shutdown events, they should fail
+ a = cluster.start("a", expect=EXPECT_EXIT_FAIL, wait=False)
+ b = cluster.start("b", expect=EXPECT_EXIT_FAIL, wait=False)
+
+
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index 49a1ea3638..c0b7127f68 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -92,9 +92,11 @@
<field name="type" type="error-type"/>
<field name="frame-seq" type="sequence-no"/>
</control>
-
- <control name="shutdown" code="0x20" label="Shut down entire cluster"/>
+ <!-- Shut down the entire cluster -->
+ <control name="shutdown" code="0x20">
+ <field name="shutdown-id" type="uuid"/>
+ </control>
</class>
diff --git a/qpid/python/qpid/brokertest.py b/qpid/python/qpid/brokertest.py
index 7458638cfd..e831060799 100644
--- a/qpid/python/qpid/brokertest.py
+++ b/qpid/python/qpid/brokertest.py
@@ -215,7 +215,7 @@ class Cluster:
_cluster_count = 0
- def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait_for_start=True):
+ def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
self.test = test
self._brokers=[]
self.name = "cluster%d" % Cluster._cluster_count
@@ -225,17 +225,17 @@ class Cluster:
self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
assert BrokerTest.cluster_lib
self.args += [ "--load-module", BrokerTest.cluster_lib ]
- self.start_n(count, expect=expect, wait_for_start=wait_for_start)
+ self.start_n(count, expect=expect, wait=wait)
- def start(self, name=None, expect=EXPECT_RUNNING, wait_for_start=True):
+ def start(self, name=None, expect=EXPECT_RUNNING, wait=True):
"""Add a broker to the cluster. Returns the index of the new broker."""
if not name: name="%s-%d" % (self.name, len(self._brokers))
log.debug("Cluster %s starting member %s" % (self.name, name))
- self._brokers.append(self.test.broker(self.args, name, expect, wait_for_start))
+ self._brokers.append(self.test.broker(self.args, name, expect, wait))
return self._brokers[-1]
- def start_n(self, count, expect=EXPECT_RUNNING, wait_for_start=True):
- for i in range(count): self.start(expect=expect, wait_for_start=wait_for_start)
+ def start_n(self, count, expect=EXPECT_RUNNING, wait=True):
+ for i in range(count): self.start(expect=expect, wait=wait)
# Behave like a list of brokers.
def __len__(self): return len(self._brokers)
@@ -275,8 +275,6 @@ class BrokerTest(TestCase):
except Exception, e: err.append(str(e))
if err: raise Exception("Unexpected process status:\n "+"\n ".join(err))
- # FIXME aconway 2009-11-06: check for core files of exited processes.
-
def cleanup_stop(self, stopable):
"""Call thing.stop at end of test"""
self.stopem.append(stopable)
@@ -288,17 +286,21 @@ class BrokerTest(TestCase):
self.cleanup_stop(p)
return p
- def broker(self, args=[], name=None, expect=EXPECT_RUNNING,wait_for_start=True):
+ def broker(self, args=[], name=None, expect=EXPECT_RUNNING,wait=True):
"""Create and return a broker ready for use"""
b = Broker(self, args=args, name=name, expect=expect)
- if (wait_for_start): b.connect().close()
+ if (wait): b.connect().close()
return b
- def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait_for_start=True):
+ def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
"""Create and return a cluster ready for use"""
- cluster = Cluster(self, count, args, expect=expect, wait_for_start=wait_for_start)
+ cluster = Cluster(self, count, args, expect=expect, wait=wait)
return cluster
+ def wait():
+ """Wait for all brokers in the cluster to be ready"""
+ for b in _brokers: b.connect().close()
+
class RethrownException(Exception):
"""Captures the original stack trace to be thrown later"""
def __init__(self, e, msg=""):