summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/ClusterPlugin.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/ClusterPlugin.cpp')
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp56
1 files changed, 56 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 6e1d275162..79c34d6873 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -21,13 +21,21 @@
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/ConnectionCodec.h"
+#include "qpid/cluster/DumpClient.h"
#include "qpid/broker/Broker.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include "qpid/shared_ptr.h"
+#include "qpid/sys/AtomicValue.h"
#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementBroker.h"
+#include "qpid/management/IdAllocator.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/SessionState.h"
+
#include <boost/utility/in_place_factory.hpp>
#include <boost/scoped_ptr.hpp>
@@ -36,6 +44,9 @@ namespace cluster {
using namespace std;
using broker::Broker;
+using management::IdAllocator;
+using management::ManagementAgent;
+using management::ManagementBroker;
struct ClusterValues {
string name;
@@ -76,6 +87,46 @@ struct ClusterOptions : public Options {
}
};
+struct DumpClientIdAllocator : management::IdAllocator
+{
+ qpid::sys::AtomicValue<uint64_t> sequence;
+
+ DumpClientIdAllocator() : sequence(0x4000000000000000LL) {}
+
+ uint64_t getIdFor(management::Manageable* m)
+ {
+ if (isDumpQueue(m) || isDumpExchange(m) || isDumpSession(m) || isDumpBinding(m)) {
+ return ++sequence;
+ } else {
+ return 0;
+ }
+ }
+
+ bool isDumpQueue(management::Manageable* manageable)
+ {
+ qpid::broker::Queue* queue = dynamic_cast<qpid::broker::Queue*>(manageable);
+ return queue && queue->getName() == DumpClient::DUMP;
+ }
+
+ bool isDumpExchange(management::Manageable* manageable)
+ {
+ qpid::broker::Exchange* exchange = dynamic_cast<qpid::broker::Exchange*>(manageable);
+ return exchange && exchange->getName() == DumpClient::DUMP;
+ }
+
+ bool isDumpSession(management::Manageable* manageable)
+ {
+ broker::SessionState* session = dynamic_cast<broker::SessionState*>(manageable);
+ return session && session->getId().getName() == DumpClient::DUMP;
+ }
+
+ bool isDumpBinding(management::Manageable* manageable)
+ {
+ broker::Exchange::Binding* binding = dynamic_cast<broker::Exchange::Binding*>(manageable);
+ return binding && binding->queue->getName() == DumpClient::DUMP;
+ }
+};
+
struct ClusterPlugin : public Plugin {
ClusterValues values;
@@ -102,6 +153,11 @@ struct ClusterPlugin : public Plugin {
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
broker->getExchanges().registerExchange(cluster->getFailoverExchange());
+ ManagementBroker* mgmt = dynamic_cast<ManagementBroker*>(ManagementAgent::Singleton::getInstance());
+ if (mgmt) {
+ std::auto_ptr<IdAllocator> allocator(new DumpClientIdAllocator());
+ mgmt->setAllocator(allocator);
+ }
}
void earlyInitialize(Plugin::Target&) {}