summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-07-05 20:12:08 +0000
committerAlan Conway <aconway@apache.org>2010-07-05 20:12:08 +0000
commit74f973f9b8aaa0aedc9e7c3b2505357aae8e614a (patch)
tree8ad8de6773a48aab5121ac4152fb8624758b5087 /cpp/src/qpid/cluster/Cluster.cpp
parent3c57e4b18865b717404ea41efbd4b80516a92a33 (diff)
downloadqpid-python-74f973f9b8aaa0aedc9e7c3b2505357aae8e614a.tar.gz
Defer delivery of messages in cluster-unsafe context.
Messages enqueued in a cluster-safe context are synchronized across the cluster. However some messages are delivered in a cluster-unsafe context, for example raising a link established event occurs the connection thread of the establishing connection. This fix deferrs such messages by multicasting them so they can be re-delived in a cluster safe context. See https://bugzilla.redhat.com/show_bug.cgi?id=611543 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@960681 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp35
1 files changed, 33 insertions, 2 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 7eb0798914..ec9ec30880 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -137,6 +137,8 @@
#include "qpid/broker/Connection.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/Message.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SignalHandler.h"
#include "qpid/framing/AMQFrame.h"
@@ -154,6 +156,7 @@
#include "qpid/framing/ClusterConnectionAnnounceBody.h"
#include "qpid/framing/ClusterErrorCheckBody.h"
#include "qpid/framing/ClusterTimerWakeupBody.h"
+#include "qpid/framing/ClusterDeliverToQueueBody.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Helpers.h"
#include "qpid/log/Statement.h"
@@ -232,9 +235,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
}
void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); }
void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); }
-
void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
-
+ void deliverToQueue(const std::string& queue, const std::string& message) {
+ cluster.deliverToQueue(queue, message, l);
+ }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
@@ -310,6 +314,7 @@ void Cluster::initialize() {
else
myUrl = settings.url;
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
+ broker.deferDelivery = boost::bind(&Cluster::deferDeliveryImpl, this, _1, _2);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
deliverEventQueue.bypassOff();
@@ -1099,4 +1104,30 @@ bool Cluster::isElder() const {
return elder;
}
+void Cluster::deliverToQueue(const std::string& queue, const std::string& message, Lock& l)
+{
+ broker::Queue::shared_ptr q = broker.getQueues().find(queue);
+ if (!q) {
+ QPID_LOG(critical, *this << " cluster delivery to non-existent queue: " << queue);
+ leave(l);
+ }
+ framing::Buffer buf(const_cast<char*>(message.data()), message.size());
+ boost::intrusive_ptr<broker::Message> msg(new broker::Message);
+ msg->decodeHeader(buf);
+ msg->decodeContent(buf);
+ q->deliver(msg);
+}
+
+bool Cluster::deferDeliveryImpl(const std::string& queue,
+ const boost::intrusive_ptr<broker::Message>& msg)
+{
+ if (isClusterSafe()) return false;
+ std::string message;
+ message.resize(msg->encodedSize());
+ framing::Buffer buf(const_cast<char*>(message.data()), message.size());
+ msg->encode(buf);
+ mcast.mcastControl(ClusterDeliverToQueueBody(ProtocolVersion(), queue, message), self);
+ return true;
+}
+
}} // namespace qpid::cluster