summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp28
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.cpp4
-rw-r--r--cpp/src/qpid/cluster/ClusterTimer.cpp2
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp49
-rw-r--r--cpp/src/qpid/cluster/Connection.h2
-rw-r--r--cpp/src/qpid/cluster/CredentialsExchange.cpp3
-rw-r--r--cpp/src/qpid/cluster/CredentialsExchange.h2
-rw-r--r--cpp/src/qpid/cluster/FailoverExchange.cpp2
-rw-r--r--cpp/src/qpid/cluster/FailoverExchange.h2
-rw-r--r--cpp/src/qpid/cluster/InitialStatusMap.cpp2
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp11
-rw-r--r--cpp/src/qpid/cluster/UpdateDataExchange.cpp4
-rw-r--r--cpp/src/qpid/cluster/UpdateDataExchange.h3
13 files changed, 92 insertions, 22 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 3c1d23c842..34aaf3d341 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -131,6 +131,7 @@
#include "qpid/cluster/UpdateExchange.h"
#include "qpid/cluster/ClusterTimer.h"
#include "qpid/cluster/CredentialsExchange.h"
+#include "qpid/cluster/UpdateClient.h"
#include "qpid/assert.h"
#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
@@ -202,7 +203,7 @@ namespace arg=client::arg;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1207877;
+const uint32_t Cluster::CLUSTER_VERSION = 1332342;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -269,7 +270,6 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
"Error delivering frames",
poller),
failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)),
- updateDataExchange(new UpdateDataExchange(*this)),
credentialsExchange(new CredentialsExchange(*this)),
quorum(boost::bind(&Cluster::leave, this)),
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
@@ -295,15 +295,6 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
// Failover exchange provides membership updates to clients.
broker.getExchanges().registerExchange(failoverExchange);
- // Update exchange is used during updates to replicate messages
- // without modifying delivery-properties.exchange.
- broker.getExchanges().registerExchange(
- boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
-
- // Update-data exchange is used for passing data that may be too large
- // for single control frame.
- broker.getExchanges().registerExchange(updateDataExchange);
-
// CredentialsExchange is used to authenticate new cluster members
broker.getExchanges().registerExchange(credentialsExchange);
@@ -680,6 +671,17 @@ void Cluster::initMapCompleted(Lock& l) {
authenticate();
broker.setRecovery(false); // Ditch my current store.
broker.setClusterUpdatee(true);
+
+ // Update exchange is used during updates to replicate messages
+ // without modifying delivery-properties.exchange.
+ broker.getExchanges().registerExchange(
+ boost::shared_ptr<broker::Exchange>(new UpdateExchange(this)));
+
+ // Update-data exchange is used during update for passing data that
+ // may be too large for single control frame.
+ updateDataExchange.reset(new UpdateDataExchange(*this));
+ broker.getExchanges().registerExchange(updateDataExchange);
+
if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update.
state = JOINER;
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
@@ -999,6 +1001,10 @@ void Cluster::checkUpdateIn(Lock& l) {
boost::ref(broker.getExchanges())));
enableClusterSafe(); // Enable cluster-safe assertions
deliverEventQueue.start();
+ // FIXME aconway 2012-04-04: unregister/delete Update[Data]Exchange
+ updateDataExchange.reset();
+ broker.getExchanges().destroy(UpdateDataExchange::EXCHANGE_NAME);
+ broker.getExchanges().destroy(UpdateClient::UPDATE);
}
else if (updateRetracted) { // Update was retracted, request another update
updateRetracted = false;
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp
index a8389095c9..d9817db35f 100644
--- a/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -21,6 +21,7 @@
#include "qpid/cluster/ClusterMap.h"
#include "qpid/Url.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
#include <algorithm>
@@ -29,7 +30,8 @@
#include <ostream>
using namespace std;
-using namespace boost;
+using boost::ref;
+using boost::optional;
namespace qpid {
using namespace framing;
diff --git a/cpp/src/qpid/cluster/ClusterTimer.cpp b/cpp/src/qpid/cluster/ClusterTimer.cpp
index b4f7d00f38..90e4fa9d4d 100644
--- a/cpp/src/qpid/cluster/ClusterTimer.cpp
+++ b/cpp/src/qpid/cluster/ClusterTimer.cpp
@@ -24,6 +24,7 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/ClusterTimerWakeupBody.h"
#include "qpid/framing/ClusterTimerDropBody.h"
+#include "qpid/sys/ClusterSafe.h"
namespace qpid {
namespace cluster {
@@ -107,6 +108,7 @@ void ClusterTimer::drop(intrusive_ptr<TimerTask> t) {
// Deliver thread
void ClusterTimer::deliverWakeup(const std::string& name) {
QPID_LOG(trace, "Cluster timer wakeup delivered for " << name);
+ qpid::sys::assertClusterSafe();
Map::iterator i = map.find(name);
if (i == map.end())
throw Exception(QPID_MSG("Cluster timer wakeup non-existent task " << name));
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index fc6ada096f..512e0f03cb 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -47,6 +47,7 @@
#include "qpid/framing/ClusterConnectionAnnounceBody.h"
#include "qpid/framing/ConnectionCloseBody.h"
#include "qpid/framing/ConnectionCloseOkBody.h"
+#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/ClusterSafe.h"
#include "qpid/types/Variant.h"
@@ -796,6 +797,54 @@ void Connection::config(const std::string& encoded) {
else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind));
}
+namespace {
+ // find a Link that matches the given Address
+ class LinkFinder {
+ qpid::Address id;
+ boost::shared_ptr<broker::Link> link;
+ public:
+ LinkFinder(const qpid::Address& _id) : id(_id) {}
+ boost::shared_ptr<broker::Link> getLink() { return link; }
+ void operator() (boost::shared_ptr<broker::Link> l)
+ {
+ if (!link) {
+ qpid::Address addr(l->getTransport(), l->getHost(), l->getPort());
+ if (id == addr) {
+ link = l;
+ }
+ }
+ }
+ };
+}
+
+void Connection::internalState(const std::string& type,
+ const std::string& name,
+ const framing::FieldTable& state)
+{
+ if (type == "link") {
+ // name is the string representation of the Link's _configured_ destination address
+ Url dest;
+ try {
+ dest = name;
+ } catch(...) {
+ throw Exception(QPID_MSG("Update failed, invalid format for Link destination address: " << name));
+ }
+ assert(dest.size());
+ LinkFinder finder(dest[0]);
+ cluster.getBroker().getLinks().eachLink(boost::ref(finder));
+ if (finder.getLink()) {
+ try {
+ finder.getLink()->setState(state);
+ } catch(...) {
+ throw Exception(QPID_MSG("Update failed, invalid state for Link " << name << ", state: " << state));
+ }
+ QPID_LOG(debug, cluster << " updated link " << dest[0] << " with state: " << state);
+ } else throw Exception(QPID_MSG("Update failed, unable to find Link named: " << name));
+ }
+ else throw Exception(QPID_MSG("Update failed, invalid object type for internal state replication: " << type));
+}
+
+
void Connection::doCatchupIoCallbacks() {
// We need to process IO callbacks during the catch-up phase in
// order to service asynchronous completions for messages
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 920c4937db..26514c76e2 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -200,6 +200,8 @@ class Connection :
const std::string& instance);
void config(const std::string& encoded);
+ void internalState(const std::string& type, const std::string& name,
+ const framing::FieldTable& state);
void setSecureConnection ( broker::SecureConnection * sc );
diff --git a/cpp/src/qpid/cluster/CredentialsExchange.cpp b/cpp/src/qpid/cluster/CredentialsExchange.cpp
index 0fafc521cd..416a3636e9 100644
--- a/cpp/src/qpid/cluster/CredentialsExchange.cpp
+++ b/cpp/src/qpid/cluster/CredentialsExchange.cpp
@@ -62,7 +62,8 @@ bool CredentialsExchange::check(MemberId member) {
return valid;
}
-void CredentialsExchange::route(broker::Deliverable& msg, const string& /*routingKey*/, const framing::FieldTable* args) {
+void CredentialsExchange::route(broker::Deliverable& msg) {
+ const framing::FieldTable* args = msg.getMessage().getApplicationHeaders();
sys::Mutex::ScopedLock l(lock);
const broker::ConnectionState* connection =
static_cast<const broker::ConnectionState*>(msg.getMessage().getPublisher());
diff --git a/cpp/src/qpid/cluster/CredentialsExchange.h b/cpp/src/qpid/cluster/CredentialsExchange.h
index 90fd188271..74cf8350a6 100644
--- a/cpp/src/qpid/cluster/CredentialsExchange.h
+++ b/cpp/src/qpid/cluster/CredentialsExchange.h
@@ -50,7 +50,7 @@ class CredentialsExchange : public broker::Exchange
bool check(MemberId member);
/** Throw an exception if the calling connection is not the cluster user. Store credentials in msg. */
- void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args);
+ void route(broker::Deliverable& msg);
// Exchange overrides
std::string getType() const;
diff --git a/cpp/src/qpid/cluster/FailoverExchange.cpp b/cpp/src/qpid/cluster/FailoverExchange.cpp
index 43ec27cf2c..87202a887c 100644
--- a/cpp/src/qpid/cluster/FailoverExchange.cpp
+++ b/cpp/src/qpid/cluster/FailoverExchange.cpp
@@ -80,7 +80,7 @@ bool FailoverExchange::isBound(Queue::shared_ptr queue, const string* const, con
return queues.find(queue) != queues.end();
}
-void FailoverExchange::route(Deliverable&, const string& , const framing::FieldTable* ) {
+void FailoverExchange::route(Deliverable&) {
QPID_LOG(warning, "Message received by exchange " << typeName << " ignoring");
}
diff --git a/cpp/src/qpid/cluster/FailoverExchange.h b/cpp/src/qpid/cluster/FailoverExchange.h
index c3e50c6929..5ac734a7ac 100644
--- a/cpp/src/qpid/cluster/FailoverExchange.h
+++ b/cpp/src/qpid/cluster/FailoverExchange.h
@@ -54,7 +54,7 @@ class FailoverExchange : public broker::Exchange
bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args);
bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args);
bool isBound(boost::shared_ptr<broker::Queue> queue, const std::string* const routingKey, const framing::FieldTable* const args);
- void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args);
+ void route(broker::Deliverable& msg);
private:
void sendUpdate(const boost::shared_ptr<broker::Queue>&);
diff --git a/cpp/src/qpid/cluster/InitialStatusMap.cpp b/cpp/src/qpid/cluster/InitialStatusMap.cpp
index eb65005a9e..fc53d1076b 100644
--- a/cpp/src/qpid/cluster/InitialStatusMap.cpp
+++ b/cpp/src/qpid/cluster/InitialStatusMap.cpp
@@ -30,9 +30,9 @@ namespace qpid {
namespace cluster {
using namespace std;
-using namespace boost;
using namespace framing::cluster;
using namespace framing;
+using boost::optional;
InitialStatusMap::InitialStatusMap(const MemberId& self_, size_t size_)
: self(self_), completed(), resendNeeded(), size(size_)
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 95c64ff060..20684fd8a7 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -57,6 +57,7 @@
#include "qpid/framing/ClusterConnectionShadowReadyBody.h"
#include "qpid/framing/ClusterConnectionSessionStateBody.h"
#include "qpid/framing/ClusterConnectionConsumerStateBody.h"
+#include "qpid/framing/FieldValue.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/framing/TypeCode.h"
@@ -687,7 +688,15 @@ void UpdateClient::updateLinks() {
void UpdateClient::updateLink(const boost::shared_ptr<broker::Link>& link) {
QPID_LOG(debug, *this << " updating link "
<< link->getHost() << ":" << link->getPort());
- ClusterConnectionProxy(session).config(encode(*link));
+ ClusterConnectionProxy(session).config(encode(*link)); // push the configuration
+ // now push the current state
+ framing::FieldTable state;
+ link->getState(state);
+ std::ostringstream os;
+ os << qpid::Address(link->getTransport(), link->getHost(), link->getPort());
+ ClusterConnectionProxy(session).internalState(std::string("link"),
+ os.str(),
+ state);
}
void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge) {
diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/cpp/src/qpid/cluster/UpdateDataExchange.cpp
index e5cd82e3d3..31d96c67ca 100644
--- a/cpp/src/qpid/cluster/UpdateDataExchange.cpp
+++ b/cpp/src/qpid/cluster/UpdateDataExchange.cpp
@@ -40,9 +40,9 @@ UpdateDataExchange::UpdateDataExchange(Cluster& cluster) :
Exchange(EXCHANGE_NAME, &cluster)
{}
-void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey,
- const qpid::framing::FieldTable* )
+void UpdateDataExchange::route(broker::Deliverable& msg)
{
+ const std::string& routingKey = msg.getMessage().getRoutingKey();
std::string data = msg.getMessage().getFrames().getContent();
if (routingKey == MANAGEMENT_AGENTS_KEY) managementAgents = data;
else if (routingKey == MANAGEMENT_SCHEMAS_KEY) managementSchemas = data;
diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.h b/cpp/src/qpid/cluster/UpdateDataExchange.h
index d2f6c35ad0..f79430f111 100644
--- a/cpp/src/qpid/cluster/UpdateDataExchange.h
+++ b/cpp/src/qpid/cluster/UpdateDataExchange.h
@@ -50,8 +50,7 @@ class UpdateDataExchange : public broker::Exchange
UpdateDataExchange(Cluster& parent);
- void route(broker::Deliverable& msg, const std::string& routingKey,
- const framing::FieldTable* args);
+ void route(broker::Deliverable& msg);
// Not implemented
std::string getType() const { return EXCHANGE_TYPE; }