summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp56
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp4
-rw-r--r--cpp/src/qpid/cluster/Connection.h3
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp2
4 files changed, 63 insertions, 2 deletions
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 6e1d275162..79c34d6873 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -21,13 +21,21 @@
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/ConnectionCodec.h"
+#include "qpid/cluster/DumpClient.h"
#include "qpid/broker/Broker.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include "qpid/shared_ptr.h"
+#include "qpid/sys/AtomicValue.h"
#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementBroker.h"
+#include "qpid/management/IdAllocator.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/SessionState.h"
+
#include <boost/utility/in_place_factory.hpp>
#include <boost/scoped_ptr.hpp>
@@ -36,6 +44,9 @@ namespace cluster {
using namespace std;
using broker::Broker;
+using management::IdAllocator;
+using management::ManagementAgent;
+using management::ManagementBroker;
struct ClusterValues {
string name;
@@ -76,6 +87,46 @@ struct ClusterOptions : public Options {
}
};
+struct DumpClientIdAllocator : management::IdAllocator
+{
+ qpid::sys::AtomicValue<uint64_t> sequence;
+
+ DumpClientIdAllocator() : sequence(0x4000000000000000LL) {}
+
+ uint64_t getIdFor(management::Manageable* m)
+ {
+ if (isDumpQueue(m) || isDumpExchange(m) || isDumpSession(m) || isDumpBinding(m)) {
+ return ++sequence;
+ } else {
+ return 0;
+ }
+ }
+
+ bool isDumpQueue(management::Manageable* manageable)
+ {
+ qpid::broker::Queue* queue = dynamic_cast<qpid::broker::Queue*>(manageable);
+ return queue && queue->getName() == DumpClient::DUMP;
+ }
+
+ bool isDumpExchange(management::Manageable* manageable)
+ {
+ qpid::broker::Exchange* exchange = dynamic_cast<qpid::broker::Exchange*>(manageable);
+ return exchange && exchange->getName() == DumpClient::DUMP;
+ }
+
+ bool isDumpSession(management::Manageable* manageable)
+ {
+ broker::SessionState* session = dynamic_cast<broker::SessionState*>(manageable);
+ return session && session->getId().getName() == DumpClient::DUMP;
+ }
+
+ bool isDumpBinding(management::Manageable* manageable)
+ {
+ broker::Exchange::Binding* binding = dynamic_cast<broker::Exchange::Binding*>(manageable);
+ return binding && binding->queue->getName() == DumpClient::DUMP;
+ }
+};
+
struct ClusterPlugin : public Plugin {
ClusterValues values;
@@ -102,6 +153,11 @@ struct ClusterPlugin : public Plugin {
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
broker->getExchanges().registerExchange(cluster->getFailoverExchange());
+ ManagementBroker* mgmt = dynamic_cast<ManagementBroker*>(ManagementAgent::Singleton::getInstance());
+ if (mgmt) {
+ std::auto_ptr<IdAllocator> allocator(new DumpClientIdAllocator());
+ mgmt->setAllocator(allocator);
+ }
}
void earlyInitialize(Plugin::Target&) {}
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index ac4b9dcdf2..d05baffe3a 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -69,7 +69,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink)
: cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId, isLink), readCredit(0),
+ connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0), readCredit(0),
expectProtocolHeader(isLink)
{ init(); }
@@ -396,5 +396,7 @@ void Connection::queue(const std::string& encoded) {
QPID_LOG(debug, cluster << " decoded queue " << q->getName());
}
+qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 5d46b7e81d..29dee5eda4 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -31,6 +31,7 @@
#include "qpid/broker/Connection.h"
#include "qpid/amqp_0_10/Connection.h"
+#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/framing/FrameDecoder.h"
@@ -173,6 +174,8 @@ class Connection :
boost::shared_ptr<broker::TxBuffer> txBuffer;
int readCredit;
bool expectProtocolHeader;
+
+ static qpid::sys::AtomicValue<uint64_t> catchUpId;
friend std::ostream& operator<<(std::ostream&, const Connection&);
};
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index 3f3212470d..00328eb310 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -94,7 +94,7 @@ DumpClient::DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url
done(ok), failed(fail)
{
connection.open(url);
- session = connection.newSession("dump_shared");
+ session = connection.newSession(DUMP);
}
DumpClient::~DumpClient() {}