diff options
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 13 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/DumpClient.cpp | 31 |
3 files changed, 33 insertions, 15 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 9c0b371066..1276a994ac 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -29,6 +29,8 @@ #include "qpid/broker/TxAccept.h" #include "qpid/broker/RecoveredEnqueue.h" #include "qpid/broker/RecoveredDequeue.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/Queue.h" #include "qpid/framing/enum.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AllInvoker.h" @@ -347,6 +349,17 @@ void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) { semanticState().setAccumulatedAck(s); } +void Connection::exchange(const std::string& encoded) { + Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); + broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf); + QPID_LOG(debug, cluster << " decoded exchange " << ex->getName()); +} + +void Connection::queue(const std::string& encoded) { + Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); + broker::Queue::shared_ptr q = broker::Queue::decode(cluster.getBroker().getQueues(), buf); + QPID_LOG(debug, cluster << " decoded queue " << q->getName()); +} }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 4d06848ae6..36476baa34 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -137,6 +137,10 @@ class Connection : void txEnd(); void accumulatedAck(const qpid::framing::SequenceSet&); + // Encoded queue/exchange replication. + void queue(const std::string& encoded); + void exchange(const std::string& encoded); + private: bool checkUnsupported(const framing::AMQBody& body); void deliverClose(); diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp index 18db83ba87..3a4f217721 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/DumpClient.cpp @@ -133,14 +133,20 @@ void DumpClient::run() { delete this; } +namespace { +template <class T> std::string encode(const T& t) { + std::string encoded; + encoded.resize(t.encodedSize()); + framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); + t.encode(buf); + return encoded; +} +} // namespace + void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) { - session.exchangeDeclare( - ex->getName(), ex->getType(), - ex->getAlternate() ? ex->getAlternate()->getName() : std::string(), - arg::passive=false, - arg::durable=ex->isDurable(), - arg::autoDelete=false, - arg::arguments=ex->getArgs()); + QPID_LOG(debug, dumperId << " dumping exchange " << ex->getName()); + ClusterConnectionProxy proxy(session); + proxy.exchange(encode(*ex)); } /** Bind a queue to the dump exchange and dump messges to it @@ -181,14 +187,9 @@ class MessageDumper { void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) { - session.queueDeclare( - q->getName(), - q->getAlternateExchange() ? q->getAlternateExchange()->getName() : std::string(), - arg::passive=false, - arg::durable=q->isDurable(), - arg::exclusive=q->hasExclusiveConsumer(), - arg::autoDelete=q->isAutoDelete(), - arg::arguments=q->getSettings()); + QPID_LOG(debug, dumperId << " dumping queue " << q->getName()); + ClusterConnectionProxy proxy(session); + proxy.queue(encode(*q)); MessageDumper dumper(q->getName(), session); q->eachMessage(boost::bind(&MessageDumper::dumpQueuedMessage, &dumper, _1)); q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1)); |
