summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp48
-rw-r--r--cpp/src/qpid/cluster/Cluster.h5
-rw-r--r--cpp/src/qpid/cluster/StoreStatus.cpp30
-rw-r--r--cpp/src/qpid/cluster/StoreStatus.h7
4 files changed, 49 insertions, 41 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 07fdc6fc93..282b639f61 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -103,6 +103,7 @@
* done single-threaded, bypassing the normal PollableQueues because
* the Poller is not active at this point to service them.
*/
+#include "qpid/Exception.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/ClusterSettings.h"
#include "qpid/cluster/Connection.h"
@@ -153,15 +154,16 @@
namespace qpid {
namespace cluster {
+using namespace qpid;
using namespace qpid::framing;
using namespace qpid::sys;
-using namespace std;
using namespace qpid::cluster;
-using namespace qpid::framing::cluster;
-using qpid::management::ManagementAgent;
-using qpid::management::ManagementObject;
-using qpid::management::Manageable;
-using qpid::management::Args;
+using namespace framing::cluster;
+using namespace std;
+using management::ManagementAgent;
+using management::ManagementObject;
+using management::Manageable;
+using management::Args;
namespace _qmf = ::qmf::org::apache::qpid::cluster;
/**
@@ -184,10 +186,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
void initialStatus(uint32_t version, bool active, const Uuid& clusterId,
- uint8_t storeState, const Uuid& start, const Uuid& stop)
+ uint8_t storeState, const Uuid& shutdownId)
{
cluster.initialStatus(member, version, active, clusterId,
- framing::cluster::StoreState(storeState), start, stop, l);
+ framing::cluster::StoreState(storeState), shutdownId, l);
}
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& current) { cluster.configChange(member, current, l); }
@@ -254,8 +256,11 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
broker.getExchanges().registerExchange(
boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
// Load my store status before we go into initialization
- if (! broker::NullMessageStore::isNullStore(&broker.getStore()))
+ if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
store.load();
+ if (store.getClusterId())
+ clusterId = store.getClusterId(); // Use stored ID if there is one.
+ }
cpg.join(name);
// Pump the CPG dispatch manually till we get initialized.
@@ -606,14 +611,18 @@ void Cluster::initMapCompleted(Lock& l) {
else {
QPID_LOG(info, this << " active for links.");
}
+ // Check that cluster ID matches persistent store.
+ Uuid agreedId = initMap.getClusterId();
+ if (store.hasStore()) {
+ Uuid storeId = store.getClusterId();
+ if (storeId && storeId != agreedId)
+ throw Exception(
+ QPID_MSG("Persistent cluster-id " << storeId
+ << " doesn't match cluster " << agreedId));
+ store.dirty(agreedId);
+ }
+ setClusterId(agreedId, l);
- setClusterId(initMap.getClusterId(), l);
- // FIXME aconway 2009-11-20: store id == cluster id.
- // Clean up redundant copy of id in InitialStatus
- // Use store ID as advertized cluster ID.
- // Consistency check on cluster ID vs. locally stored ID.
- // throw rathr than assert in StoreStatus.
- if (store.hasStore()) store.dirty(clusterId);
if (initMap.isUpdateNeeded()) { // Joining established cluster.
broker.setRecovery(false); // Ditch my current store.
state = JOINER;
@@ -645,7 +654,7 @@ void Cluster::configChange(const MemberId&, const std::string& configStr, Lock&
mcast.mcastControl(
ClusterInitialStatusBody(
ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
- store.getState(), store.getStart(), store.getStop()
+ store.getState(), store.getShutdownId()
),
self);
}
@@ -690,7 +699,7 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l)
void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active,
const framing::Uuid& id,
framing::cluster::StoreState store,
- const framing::Uuid& start, const framing::Uuid& end,
+ const framing::Uuid& shutdownId,
Lock& l)
{
if (version != CLUSTER_VERSION) {
@@ -701,8 +710,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ
}
initMap.received(
member,
- ClusterInitialStatusBody(
- ProtocolVersion(), version, active, id, store, start, end)
+ ClusterInitialStatusBody(ProtocolVersion(), version, active, id, store, shutdownId)
);
if (initMap.transitionToComplete()) {
QPID_LOG(debug, *this << " initial status map complete. ");
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index c1ee0c2be1..0f931bbe29 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -151,10 +151,9 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void initialStatus(const MemberId&,
uint32_t version,
bool active,
- const framing::Uuid& id,
+ const framing::Uuid& clusterId,
framing::cluster::StoreState,
- const framing::Uuid& start,
- const framing::Uuid& end,
+ const framing::Uuid& shutdownId,
Lock&);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& current, Lock& l);
diff --git a/cpp/src/qpid/cluster/StoreStatus.cpp b/cpp/src/qpid/cluster/StoreStatus.cpp
index 1c5f581ea1..3602ec9188 100644
--- a/cpp/src/qpid/cluster/StoreStatus.cpp
+++ b/cpp/src/qpid/cluster/StoreStatus.cpp
@@ -39,8 +39,8 @@ StoreStatus::StoreStatus(const std::string& d)
namespace {
const char* SUBDIR="cluster";
-const char* START_FILE="start";
-const char* STOP_FILE="stop";
+const char* CLUSTER_ID_FILE="cluster.uuid";
+const char* SHUTDOWN_ID_FILE="shutdown.uuid";
Uuid loadUuid(const path& path) {
Uuid ret;
@@ -62,33 +62,33 @@ void saveUuid(const path& path, const Uuid& uuid) {
void StoreStatus::load() {
path dir = path(dataDir)/SUBDIR;
create_directory(dir);
- start = loadUuid(dir/START_FILE);
- stop = loadUuid(dir/STOP_FILE);
+ clusterId = loadUuid(dir/CLUSTER_ID_FILE);
+ shutdownId = loadUuid(dir/SHUTDOWN_ID_FILE);
- if (start && stop) state = STORE_STATE_CLEAN_STORE;
- else if (start) state = STORE_STATE_DIRTY_STORE;
+ if (clusterId && shutdownId) state = STORE_STATE_CLEAN_STORE;
+ else if (clusterId) state = STORE_STATE_DIRTY_STORE;
else state = STORE_STATE_EMPTY_STORE;
}
void StoreStatus::save() {
path dir = path(dataDir)/SUBDIR;
create_directory(dir);
- saveUuid(dir/START_FILE, start);
- saveUuid(dir/STOP_FILE, stop);
+ saveUuid(dir/CLUSTER_ID_FILE, clusterId);
+ saveUuid(dir/SHUTDOWN_ID_FILE, shutdownId);
}
-void StoreStatus::dirty(const Uuid& start_) {
- start = start_;
- stop = Uuid();
+void StoreStatus::dirty(const Uuid& clusterId_) {
+ clusterId = clusterId_;
+ shutdownId = Uuid();
state = STORE_STATE_DIRTY_STORE;
save();
}
-void StoreStatus::clean(const Uuid& stop_) {
- assert(start); // FIXME aconway 2009-11-20: exception?
- assert(stop_);
+void StoreStatus::clean(const Uuid& shutdownId_) {
+ assert(clusterId); // FIXME aconway 2009-11-20: throw exception
+ assert(shutdownId_);
state = STORE_STATE_CLEAN_STORE;
- stop = stop_;
+ shutdownId = shutdownId_;
save();
}
diff --git a/cpp/src/qpid/cluster/StoreStatus.h b/cpp/src/qpid/cluster/StoreStatus.h
index b4c6bda480..ead30b8fb8 100644
--- a/cpp/src/qpid/cluster/StoreStatus.h
+++ b/cpp/src/qpid/cluster/StoreStatus.h
@@ -40,8 +40,8 @@ class StoreStatus
StoreStatus(const std::string& dir);
framing::cluster::StoreState getState() const { return state; }
- Uuid getStart() const { return start; }
- Uuid getStop() const { return stop; }
+ const Uuid& getClusterId() const { return clusterId; }
+ const Uuid& getShutdownId() const { return shutdownId; }
void dirty(const Uuid& start); // Start using the store.
void clean(const Uuid& stop); // Stop using the store.
@@ -51,9 +51,10 @@ class StoreStatus
bool hasStore() { return state != framing::cluster::STORE_STATE_NO_STORE; }
bool isEmpty() { return state != framing::cluster::STORE_STATE_EMPTY_STORE; }
+
private:
framing::cluster::StoreState state;
- Uuid start, stop;
+ Uuid clusterId, shutdownId;
std::string dataDir;
};
}} // namespace qpid::cluster