diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
| commit | 633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch) | |
| tree | 1391da89470593209466df68c0b40b89c14963b1 /cpp/src/qpid/cluster | |
| parent | c73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff) | |
| download | qpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz | |
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 28 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterTimer.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 49 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/CredentialsExchange.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/CredentialsExchange.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/FailoverExchange.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/FailoverExchange.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/InitialStatusMap.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 11 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateDataExchange.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateDataExchange.h | 3 |
13 files changed, 92 insertions, 22 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 3c1d23c842..34aaf3d341 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -131,6 +131,7 @@ #include "qpid/cluster/UpdateExchange.h" #include "qpid/cluster/ClusterTimer.h" #include "qpid/cluster/CredentialsExchange.h" +#include "qpid/cluster/UpdateClient.h" #include "qpid/assert.h" #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" @@ -202,7 +203,7 @@ namespace arg=client::arg; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 1207877; +const uint32_t Cluster::CLUSTER_VERSION = 1332342; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -269,7 +270,6 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : "Error delivering frames", poller), failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)), - updateDataExchange(new UpdateDataExchange(*this)), credentialsExchange(new CredentialsExchange(*this)), quorum(boost::bind(&Cluster::leave, this)), decoder(boost::bind(&Cluster::deliverFrame, this, _1)), @@ -295,15 +295,6 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // Failover exchange provides membership updates to clients. broker.getExchanges().registerExchange(failoverExchange); - // Update exchange is used during updates to replicate messages - // without modifying delivery-properties.exchange. - broker.getExchanges().registerExchange( - boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); - - // Update-data exchange is used for passing data that may be too large - // for single control frame. - broker.getExchanges().registerExchange(updateDataExchange); - // CredentialsExchange is used to authenticate new cluster members broker.getExchanges().registerExchange(credentialsExchange); @@ -680,6 +671,17 @@ void Cluster::initMapCompleted(Lock& l) { authenticate(); broker.setRecovery(false); // Ditch my current store. broker.setClusterUpdatee(true); + + // Update exchange is used during updates to replicate messages + // without modifying delivery-properties.exchange. + broker.getExchanges().registerExchange( + boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); + + // Update-data exchange is used during update for passing data that + // may be too large for single control frame. + updateDataExchange.reset(new UpdateDataExchange(*this)); + broker.getExchanges().registerExchange(updateDataExchange); + if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update. state = JOINER; mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); @@ -999,6 +1001,10 @@ void Cluster::checkUpdateIn(Lock& l) { boost::ref(broker.getExchanges()))); enableClusterSafe(); // Enable cluster-safe assertions deliverEventQueue.start(); + // FIXME aconway 2012-04-04: unregister/delete Update[Data]Exchange + updateDataExchange.reset(); + broker.getExchanges().destroy(UpdateDataExchange::EXCHANGE_NAME); + broker.getExchanges().destroy(UpdateClient::UPDATE); } else if (updateRetracted) { // Update was retracted, request another update updateRetracted = false; diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index a8389095c9..d9817db35f 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -21,6 +21,7 @@ #include "qpid/cluster/ClusterMap.h" #include "qpid/Url.h" #include "qpid/framing/FieldTable.h" +#include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" #include <boost/bind.hpp> #include <algorithm> @@ -29,7 +30,8 @@ #include <ostream> using namespace std; -using namespace boost; +using boost::ref; +using boost::optional; namespace qpid { using namespace framing; diff --git a/cpp/src/qpid/cluster/ClusterTimer.cpp b/cpp/src/qpid/cluster/ClusterTimer.cpp index b4f7d00f38..90e4fa9d4d 100644 --- a/cpp/src/qpid/cluster/ClusterTimer.cpp +++ b/cpp/src/qpid/cluster/ClusterTimer.cpp @@ -24,6 +24,7 @@ #include "qpid/log/Statement.h" #include "qpid/framing/ClusterTimerWakeupBody.h" #include "qpid/framing/ClusterTimerDropBody.h" +#include "qpid/sys/ClusterSafe.h" namespace qpid { namespace cluster { @@ -107,6 +108,7 @@ void ClusterTimer::drop(intrusive_ptr<TimerTask> t) { // Deliver thread void ClusterTimer::deliverWakeup(const std::string& name) { QPID_LOG(trace, "Cluster timer wakeup delivered for " << name); + qpid::sys::assertClusterSafe(); Map::iterator i = map.find(name); if (i == map.end()) throw Exception(QPID_MSG("Cluster timer wakeup non-existent task " << name)); diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index fc6ada096f..512e0f03cb 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -47,6 +47,7 @@ #include "qpid/framing/ClusterConnectionAnnounceBody.h" #include "qpid/framing/ConnectionCloseBody.h" #include "qpid/framing/ConnectionCloseOkBody.h" +#include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" #include "qpid/sys/ClusterSafe.h" #include "qpid/types/Variant.h" @@ -796,6 +797,54 @@ void Connection::config(const std::string& encoded) { else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); } +namespace { + // find a Link that matches the given Address + class LinkFinder { + qpid::Address id; + boost::shared_ptr<broker::Link> link; + public: + LinkFinder(const qpid::Address& _id) : id(_id) {} + boost::shared_ptr<broker::Link> getLink() { return link; } + void operator() (boost::shared_ptr<broker::Link> l) + { + if (!link) { + qpid::Address addr(l->getTransport(), l->getHost(), l->getPort()); + if (id == addr) { + link = l; + } + } + } + }; +} + +void Connection::internalState(const std::string& type, + const std::string& name, + const framing::FieldTable& state) +{ + if (type == "link") { + // name is the string representation of the Link's _configured_ destination address + Url dest; + try { + dest = name; + } catch(...) { + throw Exception(QPID_MSG("Update failed, invalid format for Link destination address: " << name)); + } + assert(dest.size()); + LinkFinder finder(dest[0]); + cluster.getBroker().getLinks().eachLink(boost::ref(finder)); + if (finder.getLink()) { + try { + finder.getLink()->setState(state); + } catch(...) { + throw Exception(QPID_MSG("Update failed, invalid state for Link " << name << ", state: " << state)); + } + QPID_LOG(debug, cluster << " updated link " << dest[0] << " with state: " << state); + } else throw Exception(QPID_MSG("Update failed, unable to find Link named: " << name)); + } + else throw Exception(QPID_MSG("Update failed, invalid object type for internal state replication: " << type)); +} + + void Connection::doCatchupIoCallbacks() { // We need to process IO callbacks during the catch-up phase in // order to service asynchronous completions for messages diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 920c4937db..26514c76e2 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -200,6 +200,8 @@ class Connection : const std::string& instance); void config(const std::string& encoded); + void internalState(const std::string& type, const std::string& name, + const framing::FieldTable& state); void setSecureConnection ( broker::SecureConnection * sc ); diff --git a/cpp/src/qpid/cluster/CredentialsExchange.cpp b/cpp/src/qpid/cluster/CredentialsExchange.cpp index 0fafc521cd..416a3636e9 100644 --- a/cpp/src/qpid/cluster/CredentialsExchange.cpp +++ b/cpp/src/qpid/cluster/CredentialsExchange.cpp @@ -62,7 +62,8 @@ bool CredentialsExchange::check(MemberId member) { return valid; } -void CredentialsExchange::route(broker::Deliverable& msg, const string& /*routingKey*/, const framing::FieldTable* args) { +void CredentialsExchange::route(broker::Deliverable& msg) { + const framing::FieldTable* args = msg.getMessage().getApplicationHeaders(); sys::Mutex::ScopedLock l(lock); const broker::ConnectionState* connection = static_cast<const broker::ConnectionState*>(msg.getMessage().getPublisher()); diff --git a/cpp/src/qpid/cluster/CredentialsExchange.h b/cpp/src/qpid/cluster/CredentialsExchange.h index 90fd188271..74cf8350a6 100644 --- a/cpp/src/qpid/cluster/CredentialsExchange.h +++ b/cpp/src/qpid/cluster/CredentialsExchange.h @@ -50,7 +50,7 @@ class CredentialsExchange : public broker::Exchange bool check(MemberId member); /** Throw an exception if the calling connection is not the cluster user. Store credentials in msg. */ - void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); + void route(broker::Deliverable& msg); // Exchange overrides std::string getType() const; diff --git a/cpp/src/qpid/cluster/FailoverExchange.cpp b/cpp/src/qpid/cluster/FailoverExchange.cpp index 43ec27cf2c..87202a887c 100644 --- a/cpp/src/qpid/cluster/FailoverExchange.cpp +++ b/cpp/src/qpid/cluster/FailoverExchange.cpp @@ -80,7 +80,7 @@ bool FailoverExchange::isBound(Queue::shared_ptr queue, const string* const, con return queues.find(queue) != queues.end(); } -void FailoverExchange::route(Deliverable&, const string& , const framing::FieldTable* ) { +void FailoverExchange::route(Deliverable&) { QPID_LOG(warning, "Message received by exchange " << typeName << " ignoring"); } diff --git a/cpp/src/qpid/cluster/FailoverExchange.h b/cpp/src/qpid/cluster/FailoverExchange.h index c3e50c6929..5ac734a7ac 100644 --- a/cpp/src/qpid/cluster/FailoverExchange.h +++ b/cpp/src/qpid/cluster/FailoverExchange.h @@ -54,7 +54,7 @@ class FailoverExchange : public broker::Exchange bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args); bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args); bool isBound(boost::shared_ptr<broker::Queue> queue, const std::string* const routingKey, const framing::FieldTable* const args); - void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); + void route(broker::Deliverable& msg); private: void sendUpdate(const boost::shared_ptr<broker::Queue>&); diff --git a/cpp/src/qpid/cluster/InitialStatusMap.cpp b/cpp/src/qpid/cluster/InitialStatusMap.cpp index eb65005a9e..fc53d1076b 100644 --- a/cpp/src/qpid/cluster/InitialStatusMap.cpp +++ b/cpp/src/qpid/cluster/InitialStatusMap.cpp @@ -30,9 +30,9 @@ namespace qpid { namespace cluster { using namespace std; -using namespace boost; using namespace framing::cluster; using namespace framing; +using boost::optional; InitialStatusMap::InitialStatusMap(const MemberId& self_, size_t size_) : self(self_), completed(), resendNeeded(), size(size_) diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 95c64ff060..20684fd8a7 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -57,6 +57,7 @@ #include "qpid/framing/ClusterConnectionShadowReadyBody.h" #include "qpid/framing/ClusterConnectionSessionStateBody.h" #include "qpid/framing/ClusterConnectionConsumerStateBody.h" +#include "qpid/framing/FieldValue.h" #include "qpid/framing/enum.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/framing/TypeCode.h" @@ -687,7 +688,15 @@ void UpdateClient::updateLinks() { void UpdateClient::updateLink(const boost::shared_ptr<broker::Link>& link) { QPID_LOG(debug, *this << " updating link " << link->getHost() << ":" << link->getPort()); - ClusterConnectionProxy(session).config(encode(*link)); + ClusterConnectionProxy(session).config(encode(*link)); // push the configuration + // now push the current state + framing::FieldTable state; + link->getState(state); + std::ostringstream os; + os << qpid::Address(link->getTransport(), link->getHost(), link->getPort()); + ClusterConnectionProxy(session).internalState(std::string("link"), + os.str(), + state); } void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge) { diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/cpp/src/qpid/cluster/UpdateDataExchange.cpp index e5cd82e3d3..31d96c67ca 100644 --- a/cpp/src/qpid/cluster/UpdateDataExchange.cpp +++ b/cpp/src/qpid/cluster/UpdateDataExchange.cpp @@ -40,9 +40,9 @@ UpdateDataExchange::UpdateDataExchange(Cluster& cluster) : Exchange(EXCHANGE_NAME, &cluster) {} -void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey, - const qpid::framing::FieldTable* ) +void UpdateDataExchange::route(broker::Deliverable& msg) { + const std::string& routingKey = msg.getMessage().getRoutingKey(); std::string data = msg.getMessage().getFrames().getContent(); if (routingKey == MANAGEMENT_AGENTS_KEY) managementAgents = data; else if (routingKey == MANAGEMENT_SCHEMAS_KEY) managementSchemas = data; diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.h b/cpp/src/qpid/cluster/UpdateDataExchange.h index d2f6c35ad0..f79430f111 100644 --- a/cpp/src/qpid/cluster/UpdateDataExchange.h +++ b/cpp/src/qpid/cluster/UpdateDataExchange.h @@ -50,8 +50,7 @@ class UpdateDataExchange : public broker::Exchange UpdateDataExchange(Cluster& parent); - void route(broker::Deliverable& msg, const std::string& routingKey, - const framing::FieldTable* args); + void route(broker::Deliverable& msg); // Not implemented std::string getType() const { return EXCHANGE_TYPE; } |
