diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-07-22 18:43:33 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2010-07-22 18:43:33 +0000 |
commit | fd4b59dff2ed6f050af6effd28d2c7a02351bf07 (patch) | |
tree | 5f108ef61aecfdf9e2781fb5c0faeb1866baef37 /cpp | |
parent | 1b78ab840e683b839ccc4e0bddeb643560c27d16 (diff) | |
download | qpid-python-fd4b59dff2ed6f050af6effd28d2c7a02351bf07.tar.gz |
QPID-2754: fix management object database locking problem.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@966795 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 310 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/management/ManagementObject.cpp | 2 |
3 files changed, 208 insertions, 112 deletions
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 9e4e9665ac..e98de9602e 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -101,7 +101,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : startTime(sys::now()), suppressed(false), disallowAllV1Methods(false), vendorNameKey(defaultVendorName), productNameKey(defaultProductName), - qmf1Support(qmfV1), qmf2Support(qmfV2) + qmf1Support(qmfV1), qmf2Support(qmfV2), maxV2ReplyObjs(100) { nextObjectId = 1; brokerBank = 1; @@ -428,9 +428,15 @@ void ManagementAgent::clientAdded (const string& routingKey) return; clientWasAdded = true; + std::list<std::string> rkeys; + for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) { + rkeys.push_back(aIter->second->routingKey); + } + + while (rkeys.size()) { char localBuffer[16]; Buffer outBuffer(localBuffer, 16); uint32_t outLen; @@ -438,8 +444,9 @@ void ManagementAgent::clientAdded (const string& routingKey) encodeHeader(outBuffer, 'x'); outLen = outBuffer.getPosition(); outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, aIter->second->routingKey); - QPID_LOG(trace, "SEND ConsoleAddedIndication to=" << aIter->second->routingKey); + sendBufferLH(outBuffer, outLen, dExchange, rkeys.front()); + QPID_LOG(trace, "SEND ConsoleAddedIndication to=" << rkeys.front()); + rkeys.pop_front(); } } @@ -473,6 +480,7 @@ bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) } // NOTE WELL: assumes userLock is held by caller (LH) +// NOTE EVEN WELLER: drops this lock when delivering the message!!! void ManagementAgent::sendBufferLH(Buffer& buf, uint32_t length, qpid::broker::Exchange::shared_ptr exchange, @@ -517,10 +525,12 @@ void ManagementAgent::sendBufferLH(Buffer& buf, exchange->route(deliverable, routingKey, 0); } catch(exception&) {} } + buf.reset(); } // NOTE WELL: assumes userLock is held by caller (LH) +// NOTE EVEN WELLER: drops this lock when delivering the message!!! void ManagementAgent::sendBufferLH(const string& data, const string& cid, const Variant::Map& headers, @@ -630,31 +640,46 @@ void ManagementAgent::periodicProcessing (void) clientWasAdded = false; // - // Process the entire object map. + // Process the entire object map. Remember: we drop the userLock each time we call + // sendBuffer(). This allows the managementObjects map to be altered during the + // sendBuffer() call, so always restart the search after a sendBuffer() call // - for (ManagementObjectMap::iterator baseIter = managementObjects.begin(); - baseIter != managementObjects.end(); - baseIter++) { - ManagementObject* baseObject = baseIter->second; - uint32_t pcount = 0; - uint32_t scount = 0; - - // - // Skip until we find a base object requiring a sent message. - // - if (baseObject->getFlags() == 1 || - (!baseObject->getConfigChanged() && - !baseObject->getInstChanged() && - !baseObject->getForcePublish() && - !baseObject->isDeleted())) - continue; - + while (1) { Buffer msgBuffer(msgChars, BUFSIZE); Variant::List list_; + uint32_t pcount; + uint32_t scount; + uint32_t v2Objs; + ManagementObjectMap::iterator baseIter; + std::string packageName; + std::string className; + + for (baseIter = managementObjects.begin(); + baseIter != managementObjects.end(); + baseIter++) { + ManagementObject* baseObject = baseIter->second; + // + // Skip until we find a base object requiring processing... + // + if (baseObject->getFlags() == 0) { + packageName = baseObject->getPackageName(); + className = baseObject->getClassName(); + break; + } + } + + if (baseIter == managementObjects.end()) + break; // done - all objects processed + + pcount = scount = 0; + v2Objs = 0; + list_.clear(); + msgBuffer.reset(); for (ManagementObjectMap::iterator iter = baseIter; iter != managementObjects.end(); iter++) { + ManagementObject* baseObject = baseIter->second; ManagementObject* object = iter->second; bool send_stats, send_props; if (baseObject->isSameClass(*object) && object->getFlags() == 0) { @@ -694,6 +719,7 @@ void ManagementAgent::periodicProcessing (void) object->mapEncodeValues(values, send_props, send_stats); map_["_values"] = values; list_.push_back(map_); + v2Objs++; } if (send_props) pcount++; @@ -703,8 +729,9 @@ void ManagementAgent::periodicProcessing (void) deleteList.push_back(pair<ObjectId, ManagementObject*>(iter->first, object)); object->setForcePublish(false); - if (qmf1Support && (msgBuffer.available() < HEADROOM)) - break; + if ((qmf1Support && (msgBuffer.available() < HEADROOM)) || + (qmf2Support && (v2Objs >= maxV2ReplyObjs))) + break; // have enough objects, send an indication... } } @@ -712,10 +739,10 @@ void ManagementAgent::periodicProcessing (void) if (qmf1Support) { contentSize = BUFSIZE - msgBuffer.available(); if (contentSize > 0) { - msgBuffer.reset(); stringstream key; - key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); - sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); + key << "console.obj.1.0." << packageName << "." << className; + msgBuffer.reset(); + sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK QPID_LOG(trace, "SEND V1 Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); } } @@ -726,19 +753,18 @@ void ManagementAgent::periodicProcessing (void) if (content.length()) { stringstream key; Variant::Map headers; - key << "agent.ind.data." << baseObject->getPackageName() << "." << baseObject->getClassName(); - // key << "console.obj.1.0." << baseObject->getPackageName() << "." << baseObject->getClassName(); + key << "agent.ind.data." << packageName << "." << className; headers["method"] = "indication"; headers["qmf.opcode"] = "_data_indication"; headers["qmf.content"] = "_data"; headers["qmf.agent"] = name_address; - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); - QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount); + sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK + QPID_LOG(trace, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount << " len=" << content.length()); } } } - } + } // end processing updates for all objects // Delete flagged objects for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = deleteList.rbegin(); @@ -805,26 +831,24 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) if (!object->isDeleted()) return; - if (qmf1Support) { + // since sendBufferLH drops the userLock, don't call it until we + // are done manipulating the object. #define DNOW_BUFSIZE 2048 - char msgChars[DNOW_BUFSIZE]; - uint32_t contentSize; - Buffer msgBuffer(msgChars, DNOW_BUFSIZE); + char msgChars[DNOW_BUFSIZE]; + Buffer msgBuffer(msgChars, DNOW_BUFSIZE); + Variant::List list_; + stringstream v1key, v2key; + + if (qmf1Support) { string sBuf; + v1key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); encodeHeader(msgBuffer, 'c'); object->writeProperties(sBuf); msgBuffer.putRawData(sBuf); - contentSize = msgBuffer.getPosition(); - msgBuffer.reset(); - stringstream key; - key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); - sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); - QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str()); } if (qmf2Support) { - Variant::List list_; Variant::Map map_; Variant::Map values; @@ -836,10 +860,22 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) object->mapEncodeValues(values, true, false); map_["_values"] = values; list_.push_back(map_); + v2key << "agent.ind.data." << object->getPackageName() << "." << object->getClassName(); + } - stringstream key; - key << "agent.ind.data." << object->getPackageName() << "." << object->getClassName(); + object = 0; + managementObjects.erase(oid); + + // object deleted, ok to drop lock now. + if (qmf1Support) { + uint32_t contentSize = msgBuffer.getPosition(); + msgBuffer.reset(); + sendBufferLH(msgBuffer, contentSize, mExchange, v1key.str()); + QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << v1key.str()); + } + + if (qmf2Support) { Variant::Map headers; headers["method"] = "indication"; headers["qmf.opcode"] = "_data_indication"; @@ -848,11 +884,9 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) string content; ListCodec::encode(list_, content); - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); - QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << key.str()); + sendBufferLH(content, "", headers, "amqp/list", v2Topic, v2key.str()); + QPID_LOG(trace, "SEND Immediate(delete) ContentInd to=" << v2key.str()); } - - managementObjects.erase(oid); } void ManagementAgent::sendCommandCompleteLH(const string& replyToKey, uint32_t sequence, @@ -1186,20 +1220,22 @@ void ManagementAgent::handleBrokerRequestLH (Buffer&, const string& replyToKey, void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, uint32_t sequence) { QPID_LOG(trace, "RECV PackageQuery replyTo=" << replyToKey); + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; for (PackageMap::iterator pIter = packages.begin (); pIter != packages.end (); pIter++) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - encodeHeader (outBuffer, 'p', sequence); encodePackageIndication (outBuffer, pIter); - outLen = MA_BUFFER_SIZE - outBuffer.available (); + } + + outLen = MA_BUFFER_SIZE - outBuffer.available (); + if (outLen) { outBuffer.reset (); sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND PackageInd package=" << (*pIter).first << " to=" << replyToKey << " seq=" << sequence); + QPID_LOG(trace, "SEND PackageInd to=" << replyToKey << " seq=" << sequence); } sendCommandCompleteLH(replyToKey, sequence); @@ -1227,25 +1263,32 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyTo PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) { - ClassMap cMap = pIter->second; + typedef std::pair<SchemaClassKey, uint8_t> _ckeyType; + std::list<_ckeyType> classes; + ClassMap &cMap = pIter->second; for (ClassMap::iterator cIter = cMap.begin(); cIter != cMap.end(); - cIter++) - { - if (cIter->second.hasSchema()) - { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader(outBuffer, 'q', sequence); - encodeClassIndication(outBuffer, pIter, cIter); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND ClassInd class=" << (*pIter).first << ":" << (*cIter).first.name << - "(" << Uuid((*cIter).first.hash) << ") to=" << replyToKey << " seq=" << sequence); + cIter++) { + if (cIter->second.hasSchema()) { + classes.push_back(make_pair(cIter->first, cIter->second.kind)); } } + + while (classes.size()) { + Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + + encodeHeader(outBuffer, 'q', sequence); + encodeClassIndication(outBuffer, packageName, classes.front().first, classes.front().second); + + outLen = MA_BUFFER_SIZE - outBuffer.available(); + outBuffer.reset(); + sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + QPID_LOG(trace, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name << + "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence); + classes.pop_front(); + } + } sendCommandCompleteLH(replyToKey, sequence); } @@ -1370,7 +1413,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r uint32_t outLen; encodeHeader(outBuffer, 'q'); - encodeClassIndication(outBuffer, pIter, cIter); + encodeClassIndication(outBuffer, pIter->first, cIter->first, cIter->second.kind); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); sendBufferLH(outBuffer, outLen, mExchange, "schema.class"); @@ -1536,32 +1579,60 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe } string className (value->get<string>()); + std::list<ObjectId>matches; + // build up a set of all objects to be dumped for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); iter++) { ManagementObject* object = iter->second; if (object->getClassName () == className) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + matches.push_back(object->getObjectId()); + } + } + + // send them (as sendBufferLH drops the userLock) + Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); + uint32_t outLen; + while (matches.size()) { + ObjectId objId = matches.front(); + ManagementObjectMap::iterator oIter = managementObjects.find( objId ); + if (oIter != managementObjects.end()) { + ManagementObject* object = oIter->second; if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); if (!object->isDeleted()) { - string sBuf; - encodeHeader(outBuffer, 'g', sequence); - object->writeProperties(sBuf); - outBuffer.putRawData(sBuf); - sBuf.clear(); - object->writeStatistics(sBuf, true); - outBuffer.putRawData(sBuf); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); + string sProps, sStats; + object->writeProperties(sProps); + object->writeStatistics(sStats, true); + + size_t len = 8 + sProps.length() + sStats.length(); // 8 == size of header in bytes. + if (len > MA_BUFFER_SIZE) { + QPID_LOG(error, "Object " << objId << " too large for output buffer - discarded!"); + } else { + if (outBuffer.available() < len) { // not enough room in current buffer, send it. + outLen = MA_BUFFER_SIZE - outBuffer.available (); + outBuffer.reset (); + sendBufferLH(outBuffer, outLen, dExchange, replyToKey); // drops lock + QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); + continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted. + } + encodeHeader(outBuffer, 'g', sequence); + outBuffer.putRawData(sProps); + outBuffer.putRawData(sStats); + } } } + matches.pop_front(); + } + + outLen = MA_BUFFER_SIZE - outBuffer.available (); + if (outLen) { + outBuffer.reset (); + sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + QPID_LOG(trace, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } sendCommandCompleteLH(replyToKey, sequence); @@ -1583,13 +1654,6 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo headers["qmf.opcode"] = "_query_response"; headers["qmf.content"] = "_data"; headers["qmf.agent"] = viaLocal ? "broker" : name_address; - headers["partial"] = Variant(); - - Variant::List list_; - Variant::Map map_; - Variant::Map values; - Variant::Map oidMap; - string content; /* * Unpack the _what element of the query. Currently we only support OBJECT queries. @@ -1636,6 +1700,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo */ i = inMap.find("_object_id"); if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { + Variant::List list_; ObjectId objId(i->second.asMap()); ManagementObjectMap::iterator iter = managementObjects.find(objId); @@ -1646,6 +1711,10 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo object->setUpdateTime(); if (!object->isDeleted()) { + Variant::Map map_; + Variant::Map values; + Variant::Map oidMap; + object->mapEncodeValues(values, true, true); // write both stats and properties objId.mapEncode(oidMap); map_["_values"] = values; @@ -1657,13 +1726,19 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo list_.push_back(map_); } - headers.erase("partial"); + string content; + ListCodec::encode(list_, content); sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo); return; } } else { + // send class-based result. + Variant::List _list; + Variant::List _subList; + unsigned int objCount = 0; + for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); iter++) { @@ -1671,11 +1746,11 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo if (object->getClassName() == className && (packageName.empty() || object->getPackageName() == packageName)) { - // @todo support multiple object reply per message + if (!object->isDeleted()) { - values.clear(); - list_.clear(); - oidMap.clear(); + Variant::Map map_; + Variant::Map values; + Variant::Map oidMap; if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); @@ -1690,21 +1765,39 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo object->getClassName(), "_data", object->getMd5Sum()); - list_.push_back(map_); - ListCodec::encode(list_, content); - sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); - QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo); + _subList.push_back(map_); + if (++objCount >= maxV2ReplyObjs) { + objCount = 0; + _list.push_back(_subList); + _subList.clear(); + } } } } + + if (_subList.size()) + _list.push_back(_subList); + + headers["partial"] = Variant(); + string content; + while (_list.size() > 1) { + ListCodec::encode(_list.front().asList(), content); + sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); + _list.pop_front(); + QPID_LOG(trace, "SENT QueryResponse (partial, query by schema_id) to=" << replyTo << " size=" << content.length()); + } + headers.erase("partial"); + ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content); + sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); + QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo << " size=" << content.length()); + return; } - // Send empty "non-partial" message to indicate CommandComplete - list_.clear(); - headers.erase("partial"); - ListCodec::encode(list_, content); + // Unrecognized query - Send empty message to indicate CommandComplete + string content; + ListCodec::encode(Variant::List(), content); sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); - QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo); + QPID_LOG(trace, "SENT QueryResponse (empty) to=" << replyTo); } @@ -2007,13 +2100,12 @@ void ManagementAgent::encodePackageIndication(Buffer& buf, } void ManagementAgent::encodeClassIndication(Buffer& buf, - PackageMap::iterator pIter, - ClassMap::iterator cIter) + const std::string packageName, + const SchemaClassKey key, + uint8_t kind) { - SchemaClassKey key = (*cIter).first; - - buf.putOctet((*cIter).second.kind); - buf.putShortString((*pIter).first); + buf.putOctet(kind); + buf.putShortString(packageName); key.encode(buf); } diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index a6e906e2a8..7c8ef994b0 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -288,6 +288,9 @@ private: bool qmf1Support; bool qmf2Support; + // Maximum # of objects allowed in a single V2 response + // message. + uint32_t maxV2ReplyObjs; # define MA_BUFFER_SIZE 65536 char inputBuffer[MA_BUFFER_SIZE]; @@ -323,8 +326,9 @@ private: void encodePackageIndication (framing::Buffer& buf, PackageMap::iterator pIter); void encodeClassIndication (framing::Buffer& buf, - PackageMap::iterator pIter, - ClassMap::iterator cIter); + const std::string packageName, + const struct SchemaClassKey key, + uint8_t kind); bool bankInUse (uint32_t bank); uint32_t allocateNewBank (); uint32_t assignBankLH (uint32_t requestedPrefix); diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index 5cdf9eca59..670a242c02 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -253,7 +253,7 @@ ManagementObject::ManagementObject(Manageable* _core) : createTime(qpid::sys::Duration(sys::EPOCH, sys::now())), destroyTime(0), updateTime(createTime), configChanged(true), instChanged(true), deleted(false), - coreObject(_core), forcePublish(false) {} + coreObject(_core), flags(0), forcePublish(false) {} void ManagementObject::setUpdateTime() { |