summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp48
1 files changed, 28 insertions, 20 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 07fdc6fc93..282b639f61 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -103,6 +103,7 @@
* done single-threaded, bypassing the normal PollableQueues because
* the Poller is not active at this point to service them.
*/
+#include "qpid/Exception.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/ClusterSettings.h"
#include "qpid/cluster/Connection.h"
@@ -153,15 +154,16 @@
namespace qpid {
namespace cluster {
+using namespace qpid;
using namespace qpid::framing;
using namespace qpid::sys;
-using namespace std;
using namespace qpid::cluster;
-using namespace qpid::framing::cluster;
-using qpid::management::ManagementAgent;
-using qpid::management::ManagementObject;
-using qpid::management::Manageable;
-using qpid::management::Args;
+using namespace framing::cluster;
+using namespace std;
+using management::ManagementAgent;
+using management::ManagementObject;
+using management::Manageable;
+using management::Args;
namespace _qmf = ::qmf::org::apache::qpid::cluster;
/**
@@ -184,10 +186,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
void initialStatus(uint32_t version, bool active, const Uuid& clusterId,
- uint8_t storeState, const Uuid& start, const Uuid& stop)
+ uint8_t storeState, const Uuid& shutdownId)
{
cluster.initialStatus(member, version, active, clusterId,
- framing::cluster::StoreState(storeState), start, stop, l);
+ framing::cluster::StoreState(storeState), shutdownId, l);
}
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& current) { cluster.configChange(member, current, l); }
@@ -254,8 +256,11 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
broker.getExchanges().registerExchange(
boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
// Load my store status before we go into initialization
- if (! broker::NullMessageStore::isNullStore(&broker.getStore()))
+ if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
store.load();
+ if (store.getClusterId())
+ clusterId = store.getClusterId(); // Use stored ID if there is one.
+ }
cpg.join(name);
// Pump the CPG dispatch manually till we get initialized.
@@ -606,14 +611,18 @@ void Cluster::initMapCompleted(Lock& l) {
else {
QPID_LOG(info, this << " active for links.");
}
+ // Check that cluster ID matches persistent store.
+ Uuid agreedId = initMap.getClusterId();
+ if (store.hasStore()) {
+ Uuid storeId = store.getClusterId();
+ if (storeId && storeId != agreedId)
+ throw Exception(
+ QPID_MSG("Persistent cluster-id " << storeId
+ << " doesn't match cluster " << agreedId));
+ store.dirty(agreedId);
+ }
+ setClusterId(agreedId, l);
- setClusterId(initMap.getClusterId(), l);
- // FIXME aconway 2009-11-20: store id == cluster id.
- // Clean up redundant copy of id in InitialStatus
- // Use store ID as advertized cluster ID.
- // Consistency check on cluster ID vs. locally stored ID.
- // throw rathr than assert in StoreStatus.
- if (store.hasStore()) store.dirty(clusterId);
if (initMap.isUpdateNeeded()) { // Joining established cluster.
broker.setRecovery(false); // Ditch my current store.
state = JOINER;
@@ -645,7 +654,7 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock&
mcast.mcastControl(
ClusterInitialStatusBody(
ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
- store.getState(), store.getStart(), store.getStop()
+ store.getState(), store.getShutdownId()
),
self);
}
@@ -690,7 +699,7 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l)
void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active,
const framing::Uuid& id,
framing::cluster::StoreState store,
- const framing::Uuid& start, const framing::Uuid& end,
+ const framing::Uuid& shutdownId,
Lock& l)
{
if (version != CLUSTER_VERSION) {
@@ -701,8 +710,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ
}
initMap.received(
member,
- ClusterInitialStatusBody(
- ProtocolVersion(), version, active, id, store, start, end)
+ ClusterInitialStatusBody(ProtocolVersion(), version, active, id, store, shutdownId)
);
if (initMap.transitionToComplete()) {
QPID_LOG(debug, *this << " initial status map complete. ");