From b849efb083c88de6c1932d0f7a87a7c500dbd3d6 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Fri, 23 Jan 2009 21:55:15 +0000 Subject: Use special management ids for objects used in state transfer to new members. This prevents the ids getting out of sync across the cluster and allows management methods to be used reliably. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@737203 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/ClusterPlugin.cpp | 56 ++++++++++++++++++++++++++++++++++ cpp/src/qpid/cluster/Connection.cpp | 4 ++- cpp/src/qpid/cluster/Connection.h | 3 ++ cpp/src/qpid/cluster/DumpClient.cpp | 2 +- 4 files changed, 63 insertions(+), 2 deletions(-) (limited to 'cpp/src/qpid/cluster') diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 6e1d275162..79c34d6873 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -21,13 +21,21 @@ #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ConnectionCodec.h" +#include "qpid/cluster/DumpClient.h" #include "qpid/broker/Broker.h" #include "qpid/Plugin.h" #include "qpid/Options.h" #include "qpid/shared_ptr.h" +#include "qpid/sys/AtomicValue.h" #include "qpid/log/Statement.h" +#include "qpid/management/ManagementBroker.h" +#include "qpid/management/IdAllocator.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/SessionState.h" + #include #include @@ -36,6 +44,9 @@ namespace cluster { using namespace std; using broker::Broker; +using management::IdAllocator; +using management::ManagementAgent; +using management::ManagementBroker; struct ClusterValues { string name; @@ -76,6 +87,46 @@ struct ClusterOptions : public Options { } }; +struct DumpClientIdAllocator : management::IdAllocator +{ + qpid::sys::AtomicValue sequence; + + DumpClientIdAllocator() : sequence(0x4000000000000000LL) {} + + uint64_t getIdFor(management::Manageable* m) + { + if (isDumpQueue(m) || isDumpExchange(m) || isDumpSession(m) || isDumpBinding(m)) { + return ++sequence; + } else { + return 0; + } + } + + bool isDumpQueue(management::Manageable* manageable) + { + qpid::broker::Queue* queue = dynamic_cast(manageable); + return queue && queue->getName() == DumpClient::DUMP; + } + + bool isDumpExchange(management::Manageable* manageable) + { + qpid::broker::Exchange* exchange = dynamic_cast(manageable); + return exchange && exchange->getName() == DumpClient::DUMP; + } + + bool isDumpSession(management::Manageable* manageable) + { + broker::SessionState* session = dynamic_cast(manageable); + return session && session->getId().getName() == DumpClient::DUMP; + } + + bool isDumpBinding(management::Manageable* manageable) + { + broker::Exchange::Binding* binding = dynamic_cast(manageable); + return binding && binding->queue->getName() == DumpClient::DUMP; + } +}; + struct ClusterPlugin : public Plugin { ClusterValues values; @@ -102,6 +153,11 @@ struct ClusterPlugin : public Plugin { boost::shared_ptr( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); broker->getExchanges().registerExchange(cluster->getFailoverExchange()); + ManagementBroker* mgmt = dynamic_cast(ManagementAgent::Singleton::getInstance()); + if (mgmt) { + std::auto_ptr allocator(new DumpClientIdAllocator()); + mgmt->setAllocator(allocator); + } } void earlyInitialize(Plugin::Target&) {} diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index ac4b9dcdf2..d05baffe3a 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -69,7 +69,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink) : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId, isLink), readCredit(0), + connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0), readCredit(0), expectProtocolHeader(isLink) { init(); } @@ -396,5 +396,7 @@ void Connection::queue(const std::string& encoded) { QPID_LOG(debug, cluster << " decoded queue " << q->getName()); } +qpid::sys::AtomicValue Connection::catchUpId(0x5000000000000000LL); + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 5d46b7e81d..29dee5eda4 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -31,6 +31,7 @@ #include "qpid/broker/Connection.h" #include "qpid/amqp_0_10/Connection.h" +#include "qpid/sys/AtomicValue.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/framing/FrameDecoder.h" @@ -173,6 +174,8 @@ class Connection : boost::shared_ptr txBuffer; int readCredit; bool expectProtocolHeader; + + static qpid::sys::AtomicValue catchUpId; friend std::ostream& operator<<(std::ostream&, const Connection&); }; diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index 3f3212470d..00328eb310 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -94,7 +94,7 @@ DumpClient::DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url done(ok), failed(fail) { connection.open(url); - session = connection.newSession("dump_shared"); + session = connection.newSession(DUMP); } DumpClient::~DumpClient() {} -- cgit v1.2.1