diff options
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 26 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueRegistry.cpp | 32 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueRegistry.h | 10 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 75 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 8 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 57 |
8 files changed, 153 insertions, 62 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 0851236f4a..7026dc7aa5 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -48,8 +48,6 @@ #include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" -#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" -#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" #include "qmf/org/apache/qpid/broker/EventBind.h" #include "qmf/org/apache/qpid/broker/EventUnbind.h" #include "qpid/amqp_0_10/Codecs.h" @@ -1112,21 +1110,12 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange)); } - std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate); + std::pair<Queue::shared_ptr, bool> result = + queues.declare(name, settings, alternate, false/*recovering*/, + owner, connectionId, userId); if (result.second) { //add default binding: result.first->bind(exchanges.getDefault(), name); - - if (managementAgent.get()) { - //TODO: debatable whether we should raise an event here for - //create when this is a 'declare' event; ideally add a create - //event instead? - managementAgent->raiseEvent( - _qmf::EventQueueDeclare(connectionId, userId, name, - settings.durable, owner, settings.autodelete, alternateExchange, - settings.asMap(), - "created")); - } QPID_LOG_CAT(debug, model, "Create queue. name:" << name << " user:" << userId << " rhost:" << connectionId @@ -1150,17 +1139,14 @@ void Broker::deleteQueue(const std::string& name, const std::string& userId, if (check) check(queue); if (acl) acl->recordDestroyQueue(name); - queues.destroy(name); + queues.destroy(name, connectionId, userId); queue->destroyed(); } else { throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name)); } - - if (managementAgent.get()) - managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name)); QPID_LOG_CAT(debug, model, "Delete queue. name:" << name - << " user:" << userId - << " rhost:" << connectionId + << " user:" << userId + << " rhost:" << connectionId ); } diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp index 3521e08325..ed9f01c8b2 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -23,10 +23,14 @@ #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/Exchange.h" #include "qpid/log/Statement.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/framing/reply_exceptions.h" +#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" +#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" #include <sstream> #include <assert.h> +namespace _qmf = qmf::org::apache::qpid::broker; using namespace qpid::broker; using namespace qpid::sys; using std::string; @@ -44,7 +48,10 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings, bool recovering/*true if this declare is a result of recovering queue definition from persistent - record*/) + record*/, + const OwnershipToken* owner, + std::string connectionId, + std::string userId) { std::pair<Queue::shared_ptr, bool> result; { @@ -66,12 +73,25 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings, } else { result = std::pair<Queue::shared_ptr, bool>(i->second, false); } + // NOTE: raiseEvent must be called with the lock held in order to + // ensure management events are generated in the correct order. + if (getBroker() && getBroker()->getManagementAgent() && connectionId.size() && userId.size()) { + getBroker()->getManagementAgent()->raiseEvent( + _qmf::EventQueueDeclare( + connectionId, userId, name, + settings.durable, owner, settings.autodelete, + alternate ? alternate->getName() : string(), + settings.asMap(), + result.second ? "created" : "existing")); + } } if (getBroker() && result.second) getBroker()->getConfigurationObservers().queueCreate(result.first); return result; } -void QueueRegistry::destroy(const string& name) { +void QueueRegistry::destroy( + const string& name, const string& connectionId, const string& userId) +{ Queue::shared_ptr q; { qpid::sys::RWlock::ScopedWlock locker(lock); @@ -79,6 +99,14 @@ void QueueRegistry::destroy(const string& name) { if (i != queues.end()) { q = i->second; queues.erase(i); + if (getBroker() && getBroker()->getManagementAgent() && + connectionId.size() && userId.size()) + { + // NOTE: raiseEvent must be called with the lock held in order to + // ensure management events are generated in the correct order. + getBroker()->getManagementAgent()->raiseEvent( + _qmf::EventQueueDelete(connectionId, userId, name)); + } } } if (getBroker() && q) getBroker()->getConfigurationObservers().queueDestroy(q); diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h index 7fce90c679..0eede36f3f 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.h +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h @@ -59,7 +59,9 @@ class QueueRegistry : QueueFactory { const std::string& name, const QueueSettings& settings, boost::shared_ptr<Exchange> alternateExchange = boost::shared_ptr<Exchange>(), - bool recovering = false); + bool recovering = false, + const OwnershipToken* owner = 0, + std::string connectionId=std::string(), std::string userId=std::string()); /** * Destroy the named queue. @@ -73,7 +75,11 @@ class QueueRegistry : QueueFactory { * subsequent calls to find or declare with the same name. * */ - QPID_BROKER_EXTERN void destroy(const std::string& name); + QPID_BROKER_EXTERN void destroy( + const std::string& name, + const std::string& connectionId=std::string(), + const std::string& userId=std::string()); + template <class Test> bool destroyIf(const std::string& name, Test test) { if (test()) { diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 24c64bc521..0cf55d06e6 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -318,11 +318,6 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& if (exclusive && queue->setExclusiveOwner(&session)) { exclusiveQueues.push_back(queue); } - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(), - name, durable, exclusive, autoDelete, alternateExchange, ManagementAgent::toMap(arguments), - "existing")); QPID_LOG_CAT(debug, model, "Create queue. name:" << name << " user:" << getConnection().getUserId() << " rhost:" << getConnection().getUrl() diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index c3d0598249..1358baf0e1 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -123,7 +123,7 @@ void HaBroker::initialize() { } HaBroker::~HaBroker() { - QPID_LOG(notice, logPrefix << "Shut down: " << brokerInfo); + QPID_LOG(notice, logPrefix << "Shut down"); broker.getConnectionObservers().remove(observer); } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 8e19b68284..7b8808c0a0 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -34,6 +34,7 @@ #include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/sys/Time.h" #include "qpid/sys/Thread.h" +#include "qpid/sys/PollableQueue.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/AclModule.h" #include "qpid/types/Variant.h" @@ -62,23 +63,23 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace { - const size_t qmfV1BufferSize(65536); - const string defaultVendorName("vendor"); - const string defaultProductName("product"); +const size_t qmfV1BufferSize(65536); +const string defaultVendorName("vendor"); +const string defaultProductName("product"); - // Create a valid binding key substring by - // replacing all '.' chars with '_' - const string keyifyNameStr(const string& name) - { - string n2 = name; +// Create a valid binding key substring by +// replacing all '.' chars with '_' +const string keyifyNameStr(const string& name) +{ + string n2 = name; - size_t pos = n2.find('.'); - while (pos != n2.npos) { - n2.replace(pos, 1, "_"); - pos = n2.find('.', pos); - } - return n2; + size_t pos = n2.find('.'); + while (pos != n2.npos) { + n2.replace(pos, 1, "_"); + pos = n2.find('.', pos); } + return n2; +} struct ScopedManagementContext { @@ -166,6 +167,9 @@ void ManagementAgent::configure(const string& _dataDir, bool _publish, uint16_t broker = _broker; threadPoolSize = _threads; ManagementObject::maxThreads = threadPoolSize; + sendQueue.reset( + new EventQueue(boost::bind(&ManagementAgent::sendEvents, this, _1), broker->getPoller())); + sendQueue->start(); // Get from file or generate and save to file. if (dataDir.empty()) @@ -235,13 +239,13 @@ void ManagementAgent::setName(const string& vendor, const string& product, const } else inst = instance; - name_address = vendor + ":" + product + ":" + inst; - attrMap["_instance"] = inst; - attrMap["_name"] = name_address; + name_address = vendor + ":" + product + ":" + inst; + attrMap["_instance"] = inst; + attrMap["_name"] = name_address; - vendorNameKey = keyifyNameStr(vendor); - productNameKey = keyifyNameStr(product); - instanceNameKey = keyifyNameStr(inst); + vendorNameKey = keyifyNameStr(vendor); + productNameKey = keyifyNameStr(product); + instanceNameKey = keyifyNameStr(inst); } @@ -541,14 +545,9 @@ void ManagementAgent::sendBuffer(Buffer& buf, dp->setRoutingKey(routingKey); transfer->getFrames().append(content); - Message msg(transfer, transfer); msg.setIsManagementMessage(true); - - DeliverableMessage deliverable (msg, 0); - try { - exchange->route(deliverable); - } catch(exception&) {} + sendQueue->push(make_pair(exchange, msg)); buf.reset(); } @@ -617,10 +616,7 @@ void ManagementAgent::sendBuffer(const string& data, msg.setIsManagementMessage(true); msg.computeExpiration(broker->getExpiryPolicy()); - DeliverableMessage deliverable (msg,0); - try { - exchange->route(deliverable); - } catch(exception&) {} + sendQueue->push(make_pair(exchange, msg)); } @@ -1418,7 +1414,7 @@ void ManagementAgent::handleMethodRequest (const string& body, const string& rte if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), - Manageable::STATUS_FORBIDDEN, viaLocal); + Manageable::STATUS_FORBIDDEN, viaLocal); return; } } @@ -2123,7 +2119,7 @@ bool ManagementAgent::authorizeAgentMessage(Message& msg) qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg)); const framing::MessageProperties* p = - transfer.getFrames().getHeaders()->get<framing::MessageProperties>(); + transfer.getFrames().getHeaders()->get<framing::MessageProperties>(); const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0; @@ -2939,6 +2935,21 @@ bool ManagementAgent::moveDeletedObjects() { return !deleteList.empty(); } +ManagementAgent::EventQueue::Batch::const_iterator ManagementAgent::sendEvents( + const EventQueue::Batch& batch) +{ + EventQueue::Batch::const_iterator i; + for (i = batch.begin(); i != batch.end(); ++i) { + DeliverableMessage deliverable (i->second, 0); + try { + i->first->route(deliverable); + } catch(exception& e) { + QPID_LOG(warning, "ManagementAgent failed to route event: " << e.what()); + } + } + return i; +} + namespace { QPID_TSS const qpid::broker::ConnectionState* executionContext = 0; } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index fba733a984..7f1a2e3e66 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -34,6 +34,7 @@ #include "qmf/org/apache/qpid/broker/Agent.h" #include "qmf/org/apache/qpid/broker/Memory.h" #include "qpid/sys/MemStat.h" +#include "qpid/sys/PollableQueue.h" #include "qpid/types/Variant.h" #include <qpid/framing/AMQFrame.h> #include <qpid/framing/ResizableBuffer.h> @@ -340,6 +341,11 @@ private: typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap; PendingDeletedObjsMap pendingDeletedObjs; + // Pollable queue to serialize event messages + typedef std::pair<boost::shared_ptr<broker::Exchange>, + broker::Message> ExchangeAndMessage; + typedef sys::PollableQueue<ExchangeAndMessage> EventQueue; + // // Memory statistics object // @@ -350,6 +356,7 @@ private: void deleteObjectNow(const ObjectId& oid); void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); + EventQueue::Batch::const_iterator sendEvents(const EventQueue::Batch& batch); void sendBuffer(framing::Buffer& buf, qpid::broker::Exchange::shared_ptr exchange, const std::string& routingKey); @@ -417,6 +424,7 @@ private: std::string summarizeAgents(); void debugSnapshot(const char* title); + std::auto_ptr<EventQueue> sendQueue; }; void setManagementExecutionContext(const qpid::broker::ConnectionState*); diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 2c7e164c42..3b69e3de33 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -939,6 +939,63 @@ class LongTests(HaBrokerTest): if unexpected_dead: raise Exception("Brokers not running: %s"%unexpected_dead) + def test_qmf_order(self): + """QPID 4402: HA QMF events can be out of order. + This test mimics the test described in the JIRA. Two threads repeatedly + declare the same auto-delete queue and close their connection. + """ + broker = Broker(self) + class Receiver(Thread): + def __init__(self, qname): + Thread.__init__(self) + self.qname = qname + self.stopped = False + + def run(self): + while not self.stopped: + self.connection = broker.connect() + try: + self.connection.session().receiver( + self.qname+";{create:always,node:{x-declare:{auto-delete:True}}}") + except NotFound: pass # Can occur occasionally, not an error. + try: self.connection.close() + except: pass + + class QmfObject(object): + """Track existance of an object and validate QMF events""" + def __init__(self, type_name, name_field, name): + self.type_name, self.name_field, self.name = type_name, name_field, name + self.exists = False + + def qmf_event(self, event): + content = event.content[0] + event_type = content['_schema_id']['_class_name'] + values = content['_values'] + if event_type == self.type_name+"Declare" and values[self.name_field] == self.name: + disp = values['disp'] + log.debug("Event %s: disp=%s exists=%s"%( + event_type, values['disp'], self.exists)) + if self.exists: assert values['disp'] == 'existing' + else: assert values['disp'] == 'created' + self.exists = True + elif event_type == self.type_name+"Delete" and values[self.name_field] == self.name: + log.debug("Event %s: exists=%s"%(event_type, self.exists)) + assert self.exists + self.exists = False + + # Verify order of QMF events. + helper = EventHelper() + r = broker.connect().session().receiver(helper.eventAddress()) + threads = [Receiver("qq"), Receiver("qq")] + for t in threads: t.start() + queue = QmfObject("queue", "qName", "qq") + finish = time.time() + self.duration() + try: + while time.time() < finish: + queue.qmf_event(r.fetch()) + finally: + for t in threads: t.stopped = True; t.join() + class RecoveryTests(HaBrokerTest): """Tests for recovery after a failure.""" |
