summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp7
-rw-r--r--cpp/src/qpid/broker/Broker.h13
-rw-r--r--cpp/src/qpid/broker/Queue.cpp3
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp35
-rw-r--r--cpp/src/qpid/cluster/Cluster.h10
-rwxr-xr-xcpp/src/tests/cluster_tests.py16
-rw-r--r--cpp/xml/cluster.xml6
7 files changed, 85 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 5b1b5235ec..4c2bd9c5bf 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -162,7 +162,8 @@ Broker::Broker(const Broker::Options& conf) :
clusterUpdatee(false),
expiryPolicy(new ExpiryPolicy),
connectionCounter(conf.maxConnections),
- getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
+ getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)),
+ deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2))
{
if (conf.enableMgmt) {
QPID_LOG(info, "Management enabled");
@@ -492,6 +493,10 @@ Broker::getKnownBrokersImpl()
return knownBrokers;
}
+bool Broker::deferDeliveryImpl(const std::string& ,
+ const boost::intrusive_ptr<Message>& )
+{ return false; }
+
void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
clusterTimer = t;
}
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 10408867dc..6636b5d912 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -69,6 +69,7 @@ struct Url;
namespace broker {
class ExpiryPolicy;
+class Message;
static const uint16_t DEFAULT_PORT=5672;
@@ -168,6 +169,8 @@ public:
QueueEvents queueEvents;
std::vector<Url> knownBrokers;
std::vector<Url> getKnownBrokersImpl();
+ bool deferDeliveryImpl(const std::string& queue,
+ const boost::intrusive_ptr<Message>& msg);
std::string federationTag;
bool recovery;
bool clusterUpdatee;
@@ -273,6 +276,16 @@ public:
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
ConnectionCounter& getConnectionCounter() {return connectionCounter;}
+
+ /**
+ * Never true in a stand-alone broker. In a cluster, return true
+ * to defer delivery of messages deliveredg in a cluster-unsafe
+ * context.
+ *@return true if delivery of a message should be deferred.
+ */
+ boost::function<bool (const std::string& queue,
+ const boost::intrusive_ptr<Message>& msg)> deferDelivery;
+
};
}}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index dd077aa564..40ef6052a0 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -142,6 +142,9 @@ bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg)
}
void Queue::deliver(boost::intrusive_ptr<Message> msg){
+ // Check for deferred delivery in a cluster.
+ if (broker && broker->deferDelivery(name, msg))
+ return;
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 7eb0798914..ec9ec30880 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -137,6 +137,8 @@
#include "qpid/broker/Connection.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/Message.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SignalHandler.h"
#include "qpid/framing/AMQFrame.h"
@@ -154,6 +156,7 @@
#include "qpid/framing/ClusterConnectionAnnounceBody.h"
#include "qpid/framing/ClusterErrorCheckBody.h"
#include "qpid/framing/ClusterTimerWakeupBody.h"
+#include "qpid/framing/ClusterDeliverToQueueBody.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Helpers.h"
#include "qpid/log/Statement.h"
@@ -232,9 +235,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
}
void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); }
void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); }
-
void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
-
+ void deliverToQueue(const std::string& queue, const std::string& message) {
+ cluster.deliverToQueue(queue, message, l);
+ }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
@@ -310,6 +314,7 @@ void Cluster::initialize() {
else
myUrl = settings.url;
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
+ broker.deferDelivery = boost::bind(&Cluster::deferDeliveryImpl, this, _1, _2);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
deliverEventQueue.bypassOff();
@@ -1099,4 +1104,30 @@ bool Cluster::isElder() const {
return elder;
}
+void Cluster::deliverToQueue(const std::string& queue, const std::string& message, Lock& l)
+{
+ broker::Queue::shared_ptr q = broker.getQueues().find(queue);
+ if (!q) {
+ QPID_LOG(critical, *this << " cluster delivery to non-existent queue: " << queue);
+ leave(l);
+ }
+ framing::Buffer buf(const_cast<char*>(message.data()), message.size());
+ boost::intrusive_ptr<broker::Message> msg(new broker::Message);
+ msg->decodeHeader(buf);
+ msg->decodeContent(buf);
+ q->deliver(msg);
+}
+
+bool Cluster::deferDeliveryImpl(const std::string& queue,
+ const boost::intrusive_ptr<broker::Message>& msg)
+{
+ if (isClusterSafe()) return false;
+ std::string message;
+ message.resize(msg->encodedSize());
+ framing::Buffer buf(const_cast<char*>(message.data()), message.size());
+ msg->encode(buf);
+ mcast.mcastControl(ClusterDeliverToQueueBody(ProtocolVersion(), queue, message), self);
+ return true;
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 84dee27e94..5668d04996 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -54,6 +54,10 @@
namespace qpid {
+namespace broker {
+class Message;
+}
+
namespace framing {
class AMQBody;
class Uuid;
@@ -124,6 +128,10 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// Generates a log message for debugging purposes.
std::string debugSnapshot();
+ // Defer messages delivered in an unsafe context by multicasting.
+ bool deferDeliveryImpl(const std::string& queue,
+ const boost::intrusive_ptr<broker::Message>& msg);
+
private:
typedef sys::Monitor::ScopedLock Lock;
@@ -173,8 +181,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
void timerWakeup(const MemberId&, const std::string& name, Lock&);
void timerDrop(const MemberId&, const std::string& name, Lock&);
-
void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&);
+ void deliverToQueue(const std::string& queue, const std::string& message, Lock&);
// Helper functions
ConnectionPtr getConnection(const EventFrame&, Lock&);
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 79001585f2..46bef2b3c2 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -157,7 +157,21 @@ acl allow all all
self.fail("Expected exception")
except messaging.exceptions.NotFound: pass
-
+ def test_link_events(self):
+ """Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=611543"""
+ args = ["--mgmt-pub-interval", 1] # Publish management information every second.
+ broker1 = self.cluster(1, args)[0]
+ broker2 = self.cluster(1, args)[0]
+ qp = self.popen(["qpid-printevents", broker1.host_port()], EXPECT_RUNNING)
+ qr = self.popen(["qpid-route", "route", "add",
+ broker1.host_port(), broker2.host_port(),
+ "amq.fanout", "key"
+ ], EXPECT_EXIT_OK)
+ # Look for link event in printevents output.
+ retry(lambda: find_in_file("brokerLinkUp", qp.outfile("out")))
+ broker1.ready()
+ broker2.ready()
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index 30cd159dd3..ecd4515558 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -110,6 +110,12 @@
<field name="shutdown-id" type="uuid"/>
</control>
+ <!-- Deliver a message to a queue -->
+ <control name="deliver-to-queue" code="0x21">
+ <field name="queue" type="str16"/>
+ <field name="message" type="vbin32"/>
+ </control>
+
</class>
<!-- Controls associated with a specific connection. -->