diff options
author | Alan Conway <aconway@apache.org> | 2010-07-05 20:12:08 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-07-05 20:12:08 +0000 |
commit | 74f973f9b8aaa0aedc9e7c3b2505357aae8e614a (patch) | |
tree | 8ad8de6773a48aab5121ac4152fb8624758b5087 /cpp/src/qpid/cluster/Cluster.cpp | |
parent | 3c57e4b18865b717404ea41efbd4b80516a92a33 (diff) | |
download | qpid-python-74f973f9b8aaa0aedc9e7c3b2505357aae8e614a.tar.gz |
Defer delivery of messages in cluster-unsafe context.
Messages enqueued in a cluster-safe context are synchronized across
the cluster. However some messages are delivered in a cluster-unsafe
context, for example raising a link established event occurs the
connection thread of the establishing connection.
This fix deferrs such messages by multicasting them so they can be
re-delived in a cluster safe context.
See https://bugzilla.redhat.com/show_bug.cgi?id=611543
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@960681 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 35 |
1 files changed, 33 insertions, 2 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 7eb0798914..ec9ec30880 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -137,6 +137,8 @@ #include "qpid/broker/Connection.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/Message.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/SignalHandler.h" #include "qpid/framing/AMQFrame.h" @@ -154,6 +156,7 @@ #include "qpid/framing/ClusterConnectionAnnounceBody.h" #include "qpid/framing/ClusterErrorCheckBody.h" #include "qpid/framing/ClusterTimerWakeupBody.h" +#include "qpid/framing/ClusterDeliverToQueueBody.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Helpers.h" #include "qpid/log/Statement.h" @@ -232,9 +235,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { } void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); } void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); } - void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); } - + void deliverToQueue(const std::string& queue, const std::string& message) { + cluster.deliverToQueue(queue, message, l); + } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; @@ -310,6 +314,7 @@ void Cluster::initialize() { else myUrl = settings.url; broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); + broker.deferDelivery = boost::bind(&Cluster::deferDeliveryImpl, this, _1, _2); broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); deliverEventQueue.bypassOff(); @@ -1099,4 +1104,30 @@ bool Cluster::isElder() const { return elder; } +void Cluster::deliverToQueue(const std::string& queue, const std::string& message, Lock& l) +{ + broker::Queue::shared_ptr q = broker.getQueues().find(queue); + if (!q) { + QPID_LOG(critical, *this << " cluster delivery to non-existent queue: " << queue); + leave(l); + } + framing::Buffer buf(const_cast<char*>(message.data()), message.size()); + boost::intrusive_ptr<broker::Message> msg(new broker::Message); + msg->decodeHeader(buf); + msg->decodeContent(buf); + q->deliver(msg); +} + +bool Cluster::deferDeliveryImpl(const std::string& queue, + const boost::intrusive_ptr<broker::Message>& msg) +{ + if (isClusterSafe()) return false; + std::string message; + message.resize(msg->encodedSize()); + framing::Buffer buf(const_cast<char*>(message.data()), message.size()); + msg->encode(buf); + mcast.mcastControl(ClusterDeliverToQueueBody(ProtocolVersion(), queue, message), self); + return true; +} + }} // namespace qpid::cluster |