summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/broker/Broker.cpp7
-rw-r--r--cpp/src/qpid/broker/Broker.h13
-rw-r--r--cpp/src/qpid/broker/Queue.cpp3
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp35
-rw-r--r--cpp/src/qpid/cluster/Cluster.h10
-rwxr-xr-xcpp/src/tests/cluster_tests.py16
-rw-r--r--cpp/xml/cluster.xml6
-rw-r--r--python/qpid/brokertest.py16
-rwxr-xr-xtools/src/py/qpid-printevents4
9 files changed, 95 insertions, 15 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 5b1b5235ec..4c2bd9c5bf 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -162,7 +162,8 @@ Broker::Broker(const Broker::Options& conf) :
clusterUpdatee(false),
expiryPolicy(new ExpiryPolicy),
connectionCounter(conf.maxConnections),
- getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
+ getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)),
+ deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2))
{
if (conf.enableMgmt) {
QPID_LOG(info, "Management enabled");
@@ -492,6 +493,10 @@ Broker::getKnownBrokersImpl()
return knownBrokers;
}
+bool Broker::deferDeliveryImpl(const std::string& ,
+ const boost::intrusive_ptr<Message>& )
+{ return false; }
+
void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) {
clusterTimer = t;
}
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 10408867dc..6636b5d912 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -69,6 +69,7 @@ struct Url;
namespace broker {
class ExpiryPolicy;
+class Message;
static const uint16_t DEFAULT_PORT=5672;
@@ -168,6 +169,8 @@ public:
QueueEvents queueEvents;
std::vector<Url> knownBrokers;
std::vector<Url> getKnownBrokersImpl();
+ bool deferDeliveryImpl(const std::string& queue,
+ const boost::intrusive_ptr<Message>& msg);
std::string federationTag;
bool recovery;
bool clusterUpdatee;
@@ -273,6 +276,16 @@ public:
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
ConnectionCounter& getConnectionCounter() {return connectionCounter;}
+
+ /**
+ * Never true in a stand-alone broker. In a cluster, return true
+ * to defer delivery of messages deliveredg in a cluster-unsafe
+ * context.
+ *@return true if delivery of a message should be deferred.
+ */
+ boost::function<bool (const std::string& queue,
+ const boost::intrusive_ptr<Message>& msg)> deferDelivery;
+
};
}}
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index dd077aa564..40ef6052a0 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -142,6 +142,9 @@ bool Queue::isExcluded(boost::intrusive_ptr<Message>& msg)
}
void Queue::deliver(boost::intrusive_ptr<Message> msg){
+ // Check for deferred delivery in a cluster.
+ if (broker && broker->deferDelivery(name, msg))
+ return;
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 7eb0798914..ec9ec30880 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -137,6 +137,8 @@
#include "qpid/broker/Connection.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/Message.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SignalHandler.h"
#include "qpid/framing/AMQFrame.h"
@@ -154,6 +156,7 @@
#include "qpid/framing/ClusterConnectionAnnounceBody.h"
#include "qpid/framing/ClusterErrorCheckBody.h"
#include "qpid/framing/ClusterTimerWakeupBody.h"
+#include "qpid/framing/ClusterDeliverToQueueBody.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Helpers.h"
#include "qpid/log/Statement.h"
@@ -232,9 +235,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
}
void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); }
void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); }
-
void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
-
+ void deliverToQueue(const std::string& queue, const std::string& message) {
+ cluster.deliverToQueue(queue, message, l);
+ }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
@@ -310,6 +314,7 @@ void Cluster::initialize() {
else
myUrl = settings.url;
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
+ broker.deferDelivery = boost::bind(&Cluster::deferDeliveryImpl, this, _1, _2);
broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
deliverEventQueue.bypassOff();
@@ -1099,4 +1104,30 @@ bool Cluster::isElder() const {
return elder;
}
+void Cluster::deliverToQueue(const std::string& queue, const std::string& message, Lock& l)
+{
+ broker::Queue::shared_ptr q = broker.getQueues().find(queue);
+ if (!q) {
+ QPID_LOG(critical, *this << " cluster delivery to non-existent queue: " << queue);
+ leave(l);
+ }
+ framing::Buffer buf(const_cast<char*>(message.data()), message.size());
+ boost::intrusive_ptr<broker::Message> msg(new broker::Message);
+ msg->decodeHeader(buf);
+ msg->decodeContent(buf);
+ q->deliver(msg);
+}
+
+bool Cluster::deferDeliveryImpl(const std::string& queue,
+ const boost::intrusive_ptr<broker::Message>& msg)
+{
+ if (isClusterSafe()) return false;
+ std::string message;
+ message.resize(msg->encodedSize());
+ framing::Buffer buf(const_cast<char*>(message.data()), message.size());
+ msg->encode(buf);
+ mcast.mcastControl(ClusterDeliverToQueueBody(ProtocolVersion(), queue, message), self);
+ return true;
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index 84dee27e94..5668d04996 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -54,6 +54,10 @@
namespace qpid {
+namespace broker {
+class Message;
+}
+
namespace framing {
class AMQBody;
class Uuid;
@@ -124,6 +128,10 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// Generates a log message for debugging purposes.
std::string debugSnapshot();
+ // Defer messages delivered in an unsafe context by multicasting.
+ bool deferDeliveryImpl(const std::string& queue,
+ const boost::intrusive_ptr<broker::Message>& msg);
+
private:
typedef sys::Monitor::ScopedLock Lock;
@@ -173,8 +181,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
void timerWakeup(const MemberId&, const std::string& name, Lock&);
void timerDrop(const MemberId&, const std::string& name, Lock&);
-
void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&);
+ void deliverToQueue(const std::string& queue, const std::string& message, Lock&);
// Helper functions
ConnectionPtr getConnection(const EventFrame&, Lock&);
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 79001585f2..46bef2b3c2 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -157,7 +157,21 @@ acl allow all all
self.fail("Expected exception")
except messaging.exceptions.NotFound: pass
-
+ def test_link_events(self):
+ """Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=611543"""
+ args = ["--mgmt-pub-interval", 1] # Publish management information every second.
+ broker1 = self.cluster(1, args)[0]
+ broker2 = self.cluster(1, args)[0]
+ qp = self.popen(["qpid-printevents", broker1.host_port()], EXPECT_RUNNING)
+ qr = self.popen(["qpid-route", "route", "add",
+ broker1.host_port(), broker2.host_port(),
+ "amq.fanout", "key"
+ ], EXPECT_EXIT_OK)
+ # Look for link event in printevents output.
+ retry(lambda: find_in_file("brokerLinkUp", qp.outfile("out")))
+ broker1.ready()
+ broker2.ready()
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index 30cd159dd3..ecd4515558 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -110,6 +110,12 @@
<field name="shutdown-id" type="uuid"/>
</control>
+ <!-- Deliver a message to a queue -->
+ <control name="deliver-to-queue" code="0x21">
+ <field name="queue" type="str16"/>
+ <field name="message" type="vbin32"/>
+ </control>
+
</class>
<!-- Controls associated with a specific connection. -->
diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py
index 011254d21d..5473bd588e 100644
--- a/python/qpid/brokertest.py
+++ b/python/qpid/brokertest.py
@@ -251,6 +251,12 @@ def checkenv(name):
if not value: raise Exception("Environment variable %s is not set" % name)
return value
+def find_in_file(str, filename):
+ if not os.path.exists(filename): return False
+ f = open(filename)
+ try: return str in f.read()
+ finally: f.close()
+
class Broker(Popen):
"A broker process. Takes care of start, stop and logging."
_broker_count = 0
@@ -367,15 +373,7 @@ class Broker(Popen):
def log_ready(self):
"""Return true if the log file exists and contains a broker ready message"""
if self._log_ready: return True
- if not os.path.exists(self.log): return False
- f = open(self.log)
- try:
- for l in f:
- if "notice Broker running" in l:
- self._log_ready = True
- return True
- return False
- finally: f.close()
+ self._log_ready = find_in_file("notice Broker running", self.log)
def ready(self, **kwargs):
"""Wait till broker is ready to serve clients"""
diff --git a/tools/src/py/qpid-printevents b/tools/src/py/qpid-printevents
index 0c1b618a1f..ed2155ad22 100755
--- a/tools/src/py/qpid-printevents
+++ b/tools/src/py/qpid-printevents
@@ -29,13 +29,15 @@ from qmf.console import Console, Session
class EventConsole(Console):
def event(self, broker, event):
print event
+ sys.stdout.flush()
def brokerConnected(self, broker):
print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerConnected broker=%s" % broker.getUrl()
+ sys.stdout.flush()
def brokerDisconnected(self, broker):
print strftime("%c", gmtime(time())), "NOTIC qpid-printevents:brokerDisconnected broker=%s" % broker.getUrl()
-
+ sys.stdout.flush()
##
## Main Program