diff options
author | Alan Conway <aconway@apache.org> | 2010-07-05 20:12:08 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-07-05 20:12:08 +0000 |
commit | 74f973f9b8aaa0aedc9e7c3b2505357aae8e614a (patch) | |
tree | 8ad8de6773a48aab5121ac4152fb8624758b5087 /cpp/src | |
parent | 3c57e4b18865b717404ea41efbd4b80516a92a33 (diff) | |
download | qpid-python-74f973f9b8aaa0aedc9e7c3b2505357aae8e614a.tar.gz |
Defer delivery of messages in cluster-unsafe context.
Messages enqueued in a cluster-safe context are synchronized across
the cluster. However some messages are delivered in a cluster-unsafe
context, for example raising a link established event occurs the
connection thread of the establishing connection.
This fix deferrs such messages by multicasting them so they can be
re-delived in a cluster safe context.
See https://bugzilla.redhat.com/show_bug.cgi?id=611543
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@960681 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Broker.h | 13 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 35 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 10 | ||||
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 16 |
6 files changed, 79 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 5b1b5235ec..4c2bd9c5bf 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -162,7 +162,8 @@ Broker::Broker(const Broker::Options& conf) : clusterUpdatee(false), expiryPolicy(new ExpiryPolicy), connectionCounter(conf.maxConnections), - getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) + getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)), + deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2)) { if (conf.enableMgmt) { QPID_LOG(info, "Management enabled"); @@ -492,6 +493,10 @@ Broker::getKnownBrokersImpl() return knownBrokers; } +bool Broker::deferDeliveryImpl(const std::string& , + const boost::intrusive_ptr<Message>& ) +{ return false; } + void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) { clusterTimer = t; } diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 10408867dc..6636b5d912 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -69,6 +69,7 @@ struct Url; namespace broker { class ExpiryPolicy; +class Message; static const uint16_t DEFAULT_PORT=5672; @@ -168,6 +169,8 @@ public: QueueEvents queueEvents; std::vector<Url> knownBrokers; std::vector<Url> getKnownBrokersImpl(); + bool deferDeliveryImpl(const std::string& queue, + const boost::intrusive_ptr<Message>& msg); std::string federationTag; bool recovery; bool clusterUpdatee; @@ -273,6 +276,16 @@ public: management::ManagementAgent* getManagementAgent() { return managementAgent.get(); } ConnectionCounter& getConnectionCounter() {return connectionCounter;} + + /** + * Never true in a stand-alone broker. In a cluster, return true + * to defer delivery of messages deliveredg in a cluster-unsafe + * context. + *@return true if delivery of a message should be deferred. + */ + boost::function<bool (const std::string& queue, + const boost::intrusive_ptr<Message>& msg)> deferDelivery; + }; }} diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index dd077aa564..40ef6052a0 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -142,6 +142,9 @@ bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg) } void Queue::deliver(boost::intrusive_ptr<Message> msg){ + // Check for deferred delivery in a cluster. + if (broker && broker->deferDelivery(name, msg)) + return; if (msg->isImmediate() && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg); diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 7eb0798914..ec9ec30880 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -137,6 +137,8 @@ #include "qpid/broker/Connection.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/Message.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/SignalHandler.h" #include "qpid/framing/AMQFrame.h" @@ -154,6 +156,7 @@ #include "qpid/framing/ClusterConnectionAnnounceBody.h" #include "qpid/framing/ClusterErrorCheckBody.h" #include "qpid/framing/ClusterTimerWakeupBody.h" +#include "qpid/framing/ClusterDeliverToQueueBody.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Helpers.h" #include "qpid/log/Statement.h" @@ -232,9 +235,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { } void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); } void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); } - void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); } - + void deliverToQueue(const std::string& queue, const std::string& message) { + cluster.deliverToQueue(queue, message, l); + } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; @@ -310,6 +314,7 @@ void Cluster::initialize() { else myUrl = settings.url; broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); + broker.deferDelivery = boost::bind(&Cluster::deferDeliveryImpl, this, _1, _2); broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); deliverEventQueue.bypassOff(); @@ -1099,4 +1104,30 @@ bool Cluster::isElder() const { return elder; } +void Cluster::deliverToQueue(const std::string& queue, const std::string& message, Lock& l) +{ + broker::Queue::shared_ptr q = broker.getQueues().find(queue); + if (!q) { + QPID_LOG(critical, *this << " cluster delivery to non-existent queue: " << queue); + leave(l); + } + framing::Buffer buf(const_cast<char*>(message.data()), message.size()); + boost::intrusive_ptr<broker::Message> msg(new broker::Message); + msg->decodeHeader(buf); + msg->decodeContent(buf); + q->deliver(msg); +} + +bool Cluster::deferDeliveryImpl(const std::string& queue, + const boost::intrusive_ptr<broker::Message>& msg) +{ + if (isClusterSafe()) return false; + std::string message; + message.resize(msg->encodedSize()); + framing::Buffer buf(const_cast<char*>(message.data()), message.size()); + msg->encode(buf); + mcast.mcastControl(ClusterDeliverToQueueBody(ProtocolVersion(), queue, message), self); + return true; +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 84dee27e94..5668d04996 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -54,6 +54,10 @@ namespace qpid { +namespace broker { +class Message; +} + namespace framing { class AMQBody; class Uuid; @@ -124,6 +128,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Generates a log message for debugging purposes. std::string debugSnapshot(); + // Defer messages delivered in an unsafe context by multicasting. + bool deferDeliveryImpl(const std::string& queue, + const boost::intrusive_ptr<broker::Message>& msg); + private: typedef sys::Monitor::ScopedLock Lock; @@ -173,8 +181,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&); void timerWakeup(const MemberId&, const std::string& name, Lock&); void timerDrop(const MemberId&, const std::string& name, Lock&); - void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&); + void deliverToQueue(const std::string& queue, const std::string& message, Lock&); // Helper functions ConnectionPtr getConnection(const EventFrame&, Lock&); diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 79001585f2..46bef2b3c2 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -157,7 +157,21 @@ acl allow all all self.fail("Expected exception") except messaging.exceptions.NotFound: pass - + def test_link_events(self): + """Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=611543""" + args = ["--mgmt-pub-interval", 1] # Publish management information every second. + broker1 = self.cluster(1, args)[0] + broker2 = self.cluster(1, args)[0] + qp = self.popen(["qpid-printevents", broker1.host_port()], EXPECT_RUNNING) + qr = self.popen(["qpid-route", "route", "add", + broker1.host_port(), broker2.host_port(), + "amq.fanout", "key" + ], EXPECT_EXIT_OK) + # Look for link event in printevents output. + retry(lambda: find_in_file("brokerLinkUp", qp.outfile("out"))) + broker1.ready() + broker2.ready() + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): |