summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp35
1 files changed, 33 insertions, 2 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 7eb0798914..ec9ec30880 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -137,6 +137,8 @@
#include "qpid/broker/Connection.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/Message.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SignalHandler.h"
#include "qpid/framing/AMQFrame.h"
@@ -154,6 +156,7 @@
#include "qpid/framing/ClusterConnectionAnnounceBody.h"
#include "qpid/framing/ClusterErrorCheckBody.h"
#include "qpid/framing/ClusterTimerWakeupBody.h"
+#include "qpid/framing/ClusterDeliverToQueueBody.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Helpers.h"
#include "qpid/log/Statement.h"
@@ -232,9 +235,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
}
void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); }
void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); }
-
void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
-
+ void deliverToQueue(const std::string& queue, const std::string& message) {
+ cluster.deliverToQueue(queue, message, l);
+ }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
@@ -310,6 +314,7 @@ void Cluster::initialize() {
else
myUrl = settings.url;
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
+ broker.deferDelivery = boost::bind(&Cluster::deferDeliveryImpl, this, _1, _2);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
deliverEventQueue.bypassOff();
@@ -1099,4 +1104,30 @@ bool Cluster::isElder() const {
return elder;
}
+void Cluster::deliverToQueue(const std::string& queue, const std::string& message, Lock& l)
+{
+ broker::Queue::shared_ptr q = broker.getQueues().find(queue);
+ if (!q) {
+ QPID_LOG(critical, *this << " cluster delivery to non-existent queue: " << queue);
+ leave(l);
+ }
+ framing::Buffer buf(const_cast<char*>(message.data()), message.size());
+ boost::intrusive_ptr<broker::Message> msg(new broker::Message);
+ msg->decodeHeader(buf);
+ msg->decodeContent(buf);
+ q->deliver(msg);
+}
+
+bool Cluster::deferDeliveryImpl(const std::string& queue,
+ const boost::intrusive_ptr<broker::Message>& msg)
+{
+ if (isClusterSafe()) return false;
+ std::string message;
+ message.resize(msg->encodedSize());
+ framing::Buffer buf(const_cast<char*>(message.data()), message.size());
+ msg->encode(buf);
+ mcast.mcastControl(ClusterDeliverToQueueBody(ProtocolVersion(), queue, message), self);
+ return true;
+}
+
}} // namespace qpid::cluster