summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp26
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.cpp32
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.h10
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp2
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp75
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h8
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py57
8 files changed, 153 insertions, 62 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 0851236f4a..7026dc7aa5 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -48,8 +48,6 @@
#include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
-#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include "qmf/org/apache/qpid/broker/EventBind.h"
#include "qmf/org/apache/qpid/broker/EventUnbind.h"
#include "qpid/amqp_0_10/Codecs.h"
@@ -1112,21 +1110,12 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange));
}
- std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate);
+ std::pair<Queue::shared_ptr, bool> result =
+ queues.declare(name, settings, alternate, false/*recovering*/,
+ owner, connectionId, userId);
if (result.second) {
//add default binding:
result.first->bind(exchanges.getDefault(), name);
-
- if (managementAgent.get()) {
- //TODO: debatable whether we should raise an event here for
- //create when this is a 'declare' event; ideally add a create
- //event instead?
- managementAgent->raiseEvent(
- _qmf::EventQueueDeclare(connectionId, userId, name,
- settings.durable, owner, settings.autodelete, alternateExchange,
- settings.asMap(),
- "created"));
- }
QPID_LOG_CAT(debug, model, "Create queue. name:" << name
<< " user:" << userId
<< " rhost:" << connectionId
@@ -1150,17 +1139,14 @@ void Broker::deleteQueue(const std::string& name, const std::string& userId,
if (check) check(queue);
if (acl)
acl->recordDestroyQueue(name);
- queues.destroy(name);
+ queues.destroy(name, connectionId, userId);
queue->destroyed();
} else {
throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name));
}
-
- if (managementAgent.get())
- managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name));
QPID_LOG_CAT(debug, model, "Delete queue. name:" << name
- << " user:" << userId
- << " rhost:" << connectionId
+ << " user:" << userId
+ << " rhost:" << connectionId
);
}
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
index 3521e08325..ed9f01c8b2 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -23,10 +23,14 @@
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/Exchange.h"
#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
+#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include <sstream>
#include <assert.h>
+namespace _qmf = qmf::org::apache::qpid::broker;
using namespace qpid::broker;
using namespace qpid::sys;
using std::string;
@@ -44,7 +48,10 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings,
bool recovering/*true if this declare is a
result of recovering queue
definition from persistent
- record*/)
+ record*/,
+ const OwnershipToken* owner,
+ std::string connectionId,
+ std::string userId)
{
std::pair<Queue::shared_ptr, bool> result;
{
@@ -66,12 +73,25 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings,
} else {
result = std::pair<Queue::shared_ptr, bool>(i->second, false);
}
+ // NOTE: raiseEvent must be called with the lock held in order to
+ // ensure management events are generated in the correct order.
+ if (getBroker() && getBroker()->getManagementAgent() && connectionId.size() && userId.size()) {
+ getBroker()->getManagementAgent()->raiseEvent(
+ _qmf::EventQueueDeclare(
+ connectionId, userId, name,
+ settings.durable, owner, settings.autodelete,
+ alternate ? alternate->getName() : string(),
+ settings.asMap(),
+ result.second ? "created" : "existing"));
+ }
}
if (getBroker() && result.second) getBroker()->getConfigurationObservers().queueCreate(result.first);
return result;
}
-void QueueRegistry::destroy(const string& name) {
+void QueueRegistry::destroy(
+ const string& name, const string& connectionId, const string& userId)
+{
Queue::shared_ptr q;
{
qpid::sys::RWlock::ScopedWlock locker(lock);
@@ -79,6 +99,14 @@ void QueueRegistry::destroy(const string& name) {
if (i != queues.end()) {
q = i->second;
queues.erase(i);
+ if (getBroker() && getBroker()->getManagementAgent() &&
+ connectionId.size() && userId.size())
+ {
+ // NOTE: raiseEvent must be called with the lock held in order to
+ // ensure management events are generated in the correct order.
+ getBroker()->getManagementAgent()->raiseEvent(
+ _qmf::EventQueueDelete(connectionId, userId, name));
+ }
}
}
if (getBroker() && q) getBroker()->getConfigurationObservers().queueDestroy(q);
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h
index 7fce90c679..0eede36f3f 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.h
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h
@@ -59,7 +59,9 @@ class QueueRegistry : QueueFactory {
const std::string& name,
const QueueSettings& settings,
boost::shared_ptr<Exchange> alternateExchange = boost::shared_ptr<Exchange>(),
- bool recovering = false);
+ bool recovering = false,
+ const OwnershipToken* owner = 0,
+ std::string connectionId=std::string(), std::string userId=std::string());
/**
* Destroy the named queue.
@@ -73,7 +75,11 @@ class QueueRegistry : QueueFactory {
* subsequent calls to find or declare with the same name.
*
*/
- QPID_BROKER_EXTERN void destroy(const std::string& name);
+ QPID_BROKER_EXTERN void destroy(
+ const std::string& name,
+ const std::string& connectionId=std::string(),
+ const std::string& userId=std::string());
+
template <class Test> bool destroyIf(const std::string& name, Test test)
{
if (test()) {
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index 24c64bc521..0cf55d06e6 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -318,11 +318,6 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
if (exclusive && queue->setExclusiveOwner(&session)) {
exclusiveQueues.push_back(queue);
}
- ManagementAgent* agent = getBroker().getManagementAgent();
- if (agent)
- agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
- name, durable, exclusive, autoDelete, alternateExchange, ManagementAgent::toMap(arguments),
- "existing"));
QPID_LOG_CAT(debug, model, "Create queue. name:" << name
<< " user:" << getConnection().getUserId()
<< " rhost:" << getConnection().getUrl()
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index c3d0598249..1358baf0e1 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -123,7 +123,7 @@ void HaBroker::initialize() {
}
HaBroker::~HaBroker() {
- QPID_LOG(notice, logPrefix << "Shut down: " << brokerInfo);
+ QPID_LOG(notice, logPrefix << "Shut down");
broker.getConnectionObservers().remove(observer);
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 8e19b68284..7b8808c0a0 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -34,6 +34,7 @@
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/Thread.h"
+#include "qpid/sys/PollableQueue.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/AclModule.h"
#include "qpid/types/Variant.h"
@@ -62,23 +63,23 @@ namespace _qmf = qmf::org::apache::qpid::broker;
namespace {
- const size_t qmfV1BufferSize(65536);
- const string defaultVendorName("vendor");
- const string defaultProductName("product");
+const size_t qmfV1BufferSize(65536);
+const string defaultVendorName("vendor");
+const string defaultProductName("product");
- // Create a valid binding key substring by
- // replacing all '.' chars with '_'
- const string keyifyNameStr(const string& name)
- {
- string n2 = name;
+// Create a valid binding key substring by
+// replacing all '.' chars with '_'
+const string keyifyNameStr(const string& name)
+{
+ string n2 = name;
- size_t pos = n2.find('.');
- while (pos != n2.npos) {
- n2.replace(pos, 1, "_");
- pos = n2.find('.', pos);
- }
- return n2;
+ size_t pos = n2.find('.');
+ while (pos != n2.npos) {
+ n2.replace(pos, 1, "_");
+ pos = n2.find('.', pos);
}
+ return n2;
+}
struct ScopedManagementContext
{
@@ -166,6 +167,9 @@ void ManagementAgent::configure(const string& _dataDir, bool _publish, uint16_t
broker = _broker;
threadPoolSize = _threads;
ManagementObject::maxThreads = threadPoolSize;
+ sendQueue.reset(
+ new EventQueue(boost::bind(&ManagementAgent::sendEvents, this, _1), broker->getPoller()));
+ sendQueue->start();
// Get from file or generate and save to file.
if (dataDir.empty())
@@ -235,13 +239,13 @@ void ManagementAgent::setName(const string& vendor, const string& product, const
} else
inst = instance;
- name_address = vendor + ":" + product + ":" + inst;
- attrMap["_instance"] = inst;
- attrMap["_name"] = name_address;
+ name_address = vendor + ":" + product + ":" + inst;
+ attrMap["_instance"] = inst;
+ attrMap["_name"] = name_address;
- vendorNameKey = keyifyNameStr(vendor);
- productNameKey = keyifyNameStr(product);
- instanceNameKey = keyifyNameStr(inst);
+ vendorNameKey = keyifyNameStr(vendor);
+ productNameKey = keyifyNameStr(product);
+ instanceNameKey = keyifyNameStr(inst);
}
@@ -541,14 +545,9 @@ void ManagementAgent::sendBuffer(Buffer& buf,
dp->setRoutingKey(routingKey);
transfer->getFrames().append(content);
-
Message msg(transfer, transfer);
msg.setIsManagementMessage(true);
-
- DeliverableMessage deliverable (msg, 0);
- try {
- exchange->route(deliverable);
- } catch(exception&) {}
+ sendQueue->push(make_pair(exchange, msg));
buf.reset();
}
@@ -617,10 +616,7 @@ void ManagementAgent::sendBuffer(const string& data,
msg.setIsManagementMessage(true);
msg.computeExpiration(broker->getExpiryPolicy());
- DeliverableMessage deliverable (msg,0);
- try {
- exchange->route(deliverable);
- } catch(exception&) {}
+ sendQueue->push(make_pair(exchange, msg));
}
@@ -1418,7 +1414,7 @@ void ManagementAgent::handleMethodRequest (const string& body, const string& rte
if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params)) {
sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
- Manageable::STATUS_FORBIDDEN, viaLocal);
+ Manageable::STATUS_FORBIDDEN, viaLocal);
return;
}
}
@@ -2123,7 +2119,7 @@ bool ManagementAgent::authorizeAgentMessage(Message& msg)
qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
const framing::MessageProperties* p =
- transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
+ transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
@@ -2939,6 +2935,21 @@ bool ManagementAgent::moveDeletedObjects() {
return !deleteList.empty();
}
+ManagementAgent::EventQueue::Batch::const_iterator ManagementAgent::sendEvents(
+ const EventQueue::Batch& batch)
+{
+ EventQueue::Batch::const_iterator i;
+ for (i = batch.begin(); i != batch.end(); ++i) {
+ DeliverableMessage deliverable (i->second, 0);
+ try {
+ i->first->route(deliverable);
+ } catch(exception& e) {
+ QPID_LOG(warning, "ManagementAgent failed to route event: " << e.what());
+ }
+ }
+ return i;
+}
+
namespace {
QPID_TSS const qpid::broker::ConnectionState* executionContext = 0;
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index fba733a984..7f1a2e3e66 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -34,6 +34,7 @@
#include "qmf/org/apache/qpid/broker/Agent.h"
#include "qmf/org/apache/qpid/broker/Memory.h"
#include "qpid/sys/MemStat.h"
+#include "qpid/sys/PollableQueue.h"
#include "qpid/types/Variant.h"
#include <qpid/framing/AMQFrame.h>
#include <qpid/framing/ResizableBuffer.h>
@@ -340,6 +341,11 @@ private:
typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap;
PendingDeletedObjsMap pendingDeletedObjs;
+ // Pollable queue to serialize event messages
+ typedef std::pair<boost::shared_ptr<broker::Exchange>,
+ broker::Message> ExchangeAndMessage;
+ typedef sys::PollableQueue<ExchangeAndMessage> EventQueue;
+
//
// Memory statistics object
//
@@ -350,6 +356,7 @@ private:
void deleteObjectNow(const ObjectId& oid);
void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+ EventQueue::Batch::const_iterator sendEvents(const EventQueue::Batch& batch);
void sendBuffer(framing::Buffer& buf,
qpid::broker::Exchange::shared_ptr exchange,
const std::string& routingKey);
@@ -417,6 +424,7 @@ private:
std::string summarizeAgents();
void debugSnapshot(const char* title);
+ std::auto_ptr<EventQueue> sendQueue;
};
void setManagementExecutionContext(const qpid::broker::ConnectionState*);
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 2c7e164c42..3b69e3de33 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -939,6 +939,63 @@ class LongTests(HaBrokerTest):
if unexpected_dead:
raise Exception("Brokers not running: %s"%unexpected_dead)
+ def test_qmf_order(self):
+ """QPID 4402: HA QMF events can be out of order.
+ This test mimics the test described in the JIRA. Two threads repeatedly
+ declare the same auto-delete queue and close their connection.
+ """
+ broker = Broker(self)
+ class Receiver(Thread):
+ def __init__(self, qname):
+ Thread.__init__(self)
+ self.qname = qname
+ self.stopped = False
+
+ def run(self):
+ while not self.stopped:
+ self.connection = broker.connect()
+ try:
+ self.connection.session().receiver(
+ self.qname+";{create:always,node:{x-declare:{auto-delete:True}}}")
+ except NotFound: pass # Can occur occasionally, not an error.
+ try: self.connection.close()
+ except: pass
+
+ class QmfObject(object):
+ """Track existance of an object and validate QMF events"""
+ def __init__(self, type_name, name_field, name):
+ self.type_name, self.name_field, self.name = type_name, name_field, name
+ self.exists = False
+
+ def qmf_event(self, event):
+ content = event.content[0]
+ event_type = content['_schema_id']['_class_name']
+ values = content['_values']
+ if event_type == self.type_name+"Declare" and values[self.name_field] == self.name:
+ disp = values['disp']
+ log.debug("Event %s: disp=%s exists=%s"%(
+ event_type, values['disp'], self.exists))
+ if self.exists: assert values['disp'] == 'existing'
+ else: assert values['disp'] == 'created'
+ self.exists = True
+ elif event_type == self.type_name+"Delete" and values[self.name_field] == self.name:
+ log.debug("Event %s: exists=%s"%(event_type, self.exists))
+ assert self.exists
+ self.exists = False
+
+ # Verify order of QMF events.
+ helper = EventHelper()
+ r = broker.connect().session().receiver(helper.eventAddress())
+ threads = [Receiver("qq"), Receiver("qq")]
+ for t in threads: t.start()
+ queue = QmfObject("queue", "qName", "qq")
+ finish = time.time() + self.duration()
+ try:
+ while time.time() < finish:
+ queue.qmf_event(r.fetch())
+ finally:
+ for t in threads: t.stopped = True; t.join()
+
class RecoveryTests(HaBrokerTest):
"""Tests for recovery after a failure."""