diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 35 |
1 files changed, 33 insertions, 2 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 7eb0798914..ec9ec30880 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -137,6 +137,8 @@ #include "qpid/broker/Connection.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/Message.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/SignalHandler.h" #include "qpid/framing/AMQFrame.h" @@ -154,6 +156,7 @@ #include "qpid/framing/ClusterConnectionAnnounceBody.h" #include "qpid/framing/ClusterErrorCheckBody.h" #include "qpid/framing/ClusterTimerWakeupBody.h" +#include "qpid/framing/ClusterDeliverToQueueBody.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Helpers.h" #include "qpid/log/Statement.h" @@ -232,9 +235,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { } void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); } void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); } - void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); } - + void deliverToQueue(const std::string& queue, const std::string& message) { + cluster.deliverToQueue(queue, message, l); + } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; @@ -310,6 +314,7 @@ void Cluster::initialize() { else myUrl = settings.url; broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); + broker.deferDelivery = boost::bind(&Cluster::deferDeliveryImpl, this, _1, _2); broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); deliverEventQueue.bypassOff(); @@ -1099,4 +1104,30 @@ bool Cluster::isElder() const { return elder; } +void Cluster::deliverToQueue(const std::string& queue, const std::string& message, Lock& l) +{ + broker::Queue::shared_ptr q = broker.getQueues().find(queue); + if (!q) { + QPID_LOG(critical, *this << " cluster delivery to non-existent queue: " << queue); + leave(l); + } + framing::Buffer buf(const_cast<char*>(message.data()), message.size()); + boost::intrusive_ptr<broker::Message> msg(new broker::Message); + msg->decodeHeader(buf); + msg->decodeContent(buf); + q->deliver(msg); +} + +bool Cluster::deferDeliveryImpl(const std::string& queue, + const boost::intrusive_ptr<broker::Message>& msg) +{ + if (isClusterSafe()) return false; + std::string message; + message.resize(msg->encodedSize()); + framing::Buffer buf(const_cast<char*>(message.data()), message.size()); + msg->encode(buf); + mcast.mcastControl(ClusterDeliverToQueueBody(ProtocolVersion(), queue, message), self); + return true; +} + }} // namespace qpid::cluster |