summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp13
-rw-r--r--cpp/src/qpid/cluster/Connection.h4
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp31
3 files changed, 33 insertions, 15 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 9c0b371066..1276a994ac 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -29,6 +29,8 @@
#include "qpid/broker/TxAccept.h"
#include "qpid/broker/RecoveredEnqueue.h"
#include "qpid/broker/RecoveredDequeue.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Queue.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AllInvoker.h"
@@ -347,6 +349,17 @@ void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) {
semanticState().setAccumulatedAck(s);
}
+void Connection::exchange(const std::string& encoded) {
+ Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf);
+ QPID_LOG(debug, cluster << " decoded exchange " << ex->getName());
+}
+
+void Connection::queue(const std::string& encoded) {
+ Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ broker::Queue::shared_ptr q = broker::Queue::decode(cluster.getBroker().getQueues(), buf);
+ QPID_LOG(debug, cluster << " decoded queue " << q->getName());
+}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 4d06848ae6..36476baa34 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -137,6 +137,10 @@ class Connection :
void txEnd();
void accumulatedAck(const qpid::framing::SequenceSet&);
+ // Encoded queue/exchange replication.
+ void queue(const std::string& encoded);
+ void exchange(const std::string& encoded);
+
private:
bool checkUnsupported(const framing::AMQBody& body);
void deliverClose();
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index 18db83ba87..3a4f217721 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -133,14 +133,20 @@ void DumpClient::run() {
delete this;
}
+namespace {
+template <class T> std::string encode(const T& t) {
+ std::string encoded;
+ encoded.resize(t.encodedSize());
+ framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size());
+ t.encode(buf);
+ return encoded;
+}
+} // namespace
+
void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) {
- session.exchangeDeclare(
- ex->getName(), ex->getType(),
- ex->getAlternate() ? ex->getAlternate()->getName() : std::string(),
- arg::passive=false,
- arg::durable=ex->isDurable(),
- arg::autoDelete=false,
- arg::arguments=ex->getArgs());
+ QPID_LOG(debug, dumperId << " dumping exchange " << ex->getName());
+ ClusterConnectionProxy proxy(session);
+ proxy.exchange(encode(*ex));
}
/** Bind a queue to the dump exchange and dump messges to it
@@ -181,14 +187,9 @@ class MessageDumper {
void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) {
- session.queueDeclare(
- q->getName(),
- q->getAlternateExchange() ? q->getAlternateExchange()->getName() : std::string(),
- arg::passive=false,
- arg::durable=q->isDurable(),
- arg::exclusive=q->hasExclusiveConsumer(),
- arg::autoDelete=q->isAutoDelete(),
- arg::arguments=q->getSettings());
+ QPID_LOG(debug, dumperId << " dumping queue " << q->getName());
+ ClusterConnectionProxy proxy(session);
+ proxy.queue(encode(*q));
MessageDumper dumper(q->getName(), session);
q->eachMessage(boost::bind(&MessageDumper::dumpQueuedMessage, &dumper, _1));
q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1));