diff options
| author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-01-18 14:51:31 +0000 |
|---|---|---|
| committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-01-18 14:51:31 +0000 |
| commit | 7db454bc1eae3744c676fe9e8ddd6e999cee13f1 (patch) | |
| tree | 7f3261175f7ac13162a0a902474fe94216eb21e9 /cpp/src/qpid | |
| parent | 12ee5f96c705084ebc575c6396fd2d2a714176c1 (diff) | |
| download | qpid-python-7db454bc1eae3744c676fe9e8ddd6e999cee13f1.tar.gz | |
QPID-2997: remove oid disambiguation, re-order mgmt object status updates.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1060401 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 403 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementObject.cpp | 4 |
3 files changed, 232 insertions, 185 deletions
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index cb33887fc8..7459ac9416 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -306,12 +306,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId { sys::Mutex::ScopedLock lock(addLock); - ManagementObjectMap::iterator destIter = newManagementObjects.find(objId); - while (destIter != newManagementObjects.end()) { - objId.disambiguate(); - destIter = newManagementObjects.find(objId); - } - newManagementObjects[objId] = object; + newManagementObjects.push_back(object); } QPID_LOG(debug, "Management object (V1) added: " << objId.getV2Key()); return objId; @@ -337,12 +332,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, object->setObjectId(objId); { sys::Mutex::ScopedLock lock(addLock); - ManagementObjectMap::iterator destIter = newManagementObjects.find(objId); - while (destIter != newManagementObjects.end()) { - objId.disambiguate(); - destIter = newManagementObjects.find(objId); - } - newManagementObjects[objId] = object; + newManagementObjects.push_back(object); } QPID_LOG(debug, "Management object added: " << objId.getV2Key()); return objId; @@ -621,22 +611,50 @@ void ManagementAgent::sendBufferLH(const string& data, } +/** Objects that have been added since the last periodic poll are temporarily + * saved in the newManagementObjects list. This allows objects to be + * added without needing to block on the userLock (addLock is used instead). + * These new objects need to be integrated into the object database + * (managementObjects) *before* they can be properly managed. This routine + * performs the integration. + * + * Note well: objects on the newManagementObjects list may have been + * marked as "deleted", and, possibly re-added. This would result in + * duplicate object ids. To avoid clashes, don't put deleted objects + * into the active object database. + */ void ManagementAgent::moveNewObjectsLH() { sys::Mutex::ScopedLock lock (addLock); - for (ManagementObjectMap::iterator iter = newManagementObjects.begin (); - iter != newManagementObjects.end (); - iter++) { - ObjectId oid = iter->first; - ManagementObjectMap::iterator destIter = managementObjects.find(oid); - while (destIter != managementObjects.end()) { - oid.disambiguate(); - destIter = managementObjects.find(oid); - } + while (!newManagementObjects.empty()) { + ManagementObject *object = newManagementObjects.back(); + newManagementObjects.pop_back(); - managementObjects[oid] = iter->second; + if (object->isDeleted()) { + DeletedObject::shared_ptr dptr(new DeletedObject(object, qmf1Support, qmf2Support)); + pendingDeletedObjs[dptr->getKey()].push_back(dptr); + delete object; + } else { // add to active object list, check for duplicates. + ObjectId oid = object->getObjectId(); + ManagementObjectMap::iterator destIter = managementObjects.find(oid); + if (destIter != managementObjects.end()) { + // duplicate found. It is OK if the old object has been marked + // deleted... + ManagementObject *oldObj = destIter->second; + if (oldObj->isDeleted()) { + DeletedObject::shared_ptr dptr(new DeletedObject(oldObj, qmf1Support, qmf2Support)); + pendingDeletedObjs[dptr->getKey()].push_back(dptr); + delete oldObj; + } else { + // Duplicate non-deleted objects? This is a user error - oids must be unique. + // for now, leak the old object (safer than deleting - may still be referenced) + // and complain loudly... + QPID_LOG(error, "Detected two management objects with the same identifier: " << oid); + } + } + managementObjects[oid] = object; + } } - newManagementObjects.clear(); } void ManagementAgent::periodicProcessing (void) @@ -670,7 +688,126 @@ void ManagementAgent::periodicProcessing (void) clientWasAdded = false; + // first send the pending deletes before sending updates. This prevents a + // "false delete" scenario: if an object was deleted then re-added during + // the last poll cycle, it will have a delete entry and an active entry. + // if we sent the active update first, _then_ the delete update, clients + // would incorrectly think the object was deleted. See QPID-2997 + // bool objectsDeleted = moveDeletedObjectsLH(); + if (!pendingDeletedObjs.empty()) { + // use a temporary copy of the pending deletes so dropping the lock when + // the buffer is sent is safe. + PendingDeletedObjsMap tmp(pendingDeletedObjs); + pendingDeletedObjs.clear(); + + for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) { + std::string packageName; + std::string className; + Buffer msgBuffer(msgChars, BUFSIZE); + uint32_t v1Objs = 0; + uint32_t v2Objs = 0; + Variant::List list_; + + size_t pos = mIter->first.find(":"); + packageName = mIter->first.substr(0, pos); + className = mIter->first.substr(pos+1); + + for (DeletedObjectList::iterator lIter = mIter->second.begin(); + lIter != mIter->second.end(); lIter++) { + std::string oid = (*lIter)->objectId; + if (!(*lIter)->encodedV1Config.empty()) { + encodeHeader(msgBuffer, 'c'); + msgBuffer.putRawData((*lIter)->encodedV1Config); + QPID_LOG(trace, "Deleting V1 properties " << oid + << " len=" << (*lIter)->encodedV1Config.size()); + v1Objs++; + } + if (!(*lIter)->encodedV1Inst.empty()) { + encodeHeader(msgBuffer, 'i'); + msgBuffer.putRawData((*lIter)->encodedV1Inst); + QPID_LOG(trace, "Deleting V1 statistics " << oid + << " len=" << (*lIter)->encodedV1Inst.size()); + v1Objs++; + } + if (v1Objs && msgBuffer.available() < HEADROOM) { + v1Objs = 0; + contentSize = BUFSIZE - msgBuffer.available(); + stringstream key; + key << "console.obj.1.0." << packageName << "." << className; + msgBuffer.reset(); + sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK + QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" + << key.str() << " len=" << contentSize); + } + + if (!(*lIter)->encodedV2.empty()) { + QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2); + list_.push_back((*lIter)->encodedV2); + if (++v2Objs >= maxV2ReplyObjs) { + v2Objs = 0; + + string content; + ListCodec::encode(list_, content); + list_.clear(); + if (content.length()) { + stringstream key; + Variant::Map headers; + key << "agent.ind.data." << keyifyNameStr(packageName) + << "." << keyifyNameStr(className) + << "." << vendorNameKey + << "." << productNameKey; + if (!instanceNameKey.empty()) + key << "." << instanceNameKey; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK + QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); + } + } + } + } // end current list + + // send any remaining objects... + + if (v1Objs) { + contentSize = BUFSIZE - msgBuffer.available(); + stringstream key; + key << "console.obj.1.0." << packageName << "." << className; + msgBuffer.reset(); + sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK + QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); + } + + if (!list_.empty()) { + string content; + ListCodec::encode(list_, content); + list_.clear(); + if (content.length()) { + stringstream key; + Variant::Map headers; + key << "agent.ind.data." << keyifyNameStr(packageName) + << "." << keyifyNameStr(className) + << "." << vendorNameKey + << "." << productNameKey; + if (!instanceNameKey.empty()) + key << "." << instanceNameKey; + + headers["method"] = "indication"; + headers["qmf.opcode"] = "_data_indication"; + headers["qmf.content"] = "_data"; + headers["qmf.agent"] = name_address; + + sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK + QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); + } + } + } // end map + } // // Process the entire object map. Remember: we drop the userLock each time we call @@ -828,122 +965,6 @@ void ManagementAgent::periodicProcessing (void) } } // end processing updates for all objects - - // now send the pending deletes. Make a temporary copy of the pending deletes so dropping the - // lock when the buffer is sent is safe. - // - if (!pendingDeletedObjs.empty()) { - PendingDeletedObjsMap tmp(pendingDeletedObjs); - pendingDeletedObjs.clear(); - - for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) { - std::string packageName; - std::string className; - Buffer msgBuffer(msgChars, BUFSIZE); - uint32_t v1Objs = 0; - uint32_t v2Objs = 0; - Variant::List list_; - - size_t pos = mIter->first.find(":"); - packageName = mIter->first.substr(0, pos); - className = mIter->first.substr(pos+1); - - for (DeletedObjectList::iterator lIter = mIter->second.begin(); - lIter != mIter->second.end(); lIter++) { - std::string oid = (*lIter)->objectId; - if (!(*lIter)->encodedV1Config.empty()) { - encodeHeader(msgBuffer, 'c'); - msgBuffer.putRawData((*lIter)->encodedV1Config); - QPID_LOG(trace, "Deleting V1 properties " << oid - << " len=" << (*lIter)->encodedV1Config.size()); - v1Objs++; - } - if (!(*lIter)->encodedV1Inst.empty()) { - encodeHeader(msgBuffer, 'i'); - msgBuffer.putRawData((*lIter)->encodedV1Inst); - QPID_LOG(trace, "Deleting V1 statistics " << oid - << " len=" << (*lIter)->encodedV1Inst.size()); - v1Objs++; - } - if (v1Objs && msgBuffer.available() < HEADROOM) { - v1Objs = 0; - contentSize = BUFSIZE - msgBuffer.available(); - stringstream key; - key << "console.obj.1.0." << packageName << "." << className; - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK - QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" - << key.str() << " len=" << contentSize); - } - - if (!(*lIter)->encodedV2.empty()) { - QPID_LOG(trace, "Deleting V2 " << "map=" << (*lIter)->encodedV2); - list_.push_back((*lIter)->encodedV2); - if (++v2Objs >= maxV2ReplyObjs) { - v2Objs = 0; - - string content; - ListCodec::encode(list_, content); - list_.clear(); - if (content.length()) { - stringstream key; - Variant::Map headers; - key << "agent.ind.data." << keyifyNameStr(packageName) - << "." << keyifyNameStr(className) - << "." << vendorNameKey - << "." << productNameKey; - if (!instanceNameKey.empty()) - key << "." << instanceNameKey; - - headers["method"] = "indication"; - headers["qmf.opcode"] = "_data_indication"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = name_address; - - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK - QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); - } - } - } - } // end current list - - // send any remaining objects... - - if (v1Objs) { - contentSize = BUFSIZE - msgBuffer.available(); - stringstream key; - key << "console.obj.1.0." << packageName << "." << className; - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK - QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); - } - - if (!list_.empty()) { - string content; - ListCodec::encode(list_, content); - list_.clear(); - if (content.length()) { - stringstream key; - Variant::Map headers; - key << "agent.ind.data." << keyifyNameStr(packageName) - << "." << keyifyNameStr(className) - << "." << vendorNameKey - << "." << productNameKey; - if (!instanceNameKey.empty()) - key << "." << instanceNameKey; - - headers["method"] = "indication"; - headers["qmf.opcode"] = "_data_indication"; - headers["qmf.content"] = "_data"; - headers["qmf.agent"] = name_address; - - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK - QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); - } - } - } // end map - } - if (objectsDeleted) deleteOrphanedAgentsLH(); // heartbeat generation @@ -2619,13 +2640,24 @@ void ManagementAgent::importAgents(qpid::framing::Buffer& inBuf) { } namespace { -bool isDeleted(const ManagementObjectMap::value_type& value) { +bool isDeletedMap(const ManagementObjectMap::value_type& value) { return value.second->isDeleted(); } +bool isDeletedVector(const ManagementObjectVector::value_type& value) { + return value->isDeleted(); +} + string summarizeMap(const char* name, const ManagementObjectMap& map) { ostringstream o; - size_t deleted = std::count_if(map.begin(), map.end(), isDeleted); + size_t deleted = std::count_if(map.begin(), map.end(), isDeletedMap); + o << map.size() << " " << name << " (" << deleted << " deleted), "; + return o.str(); +} + +string summarizeVector(const char* name, const ManagementObjectVector& map) { + ostringstream o; + size_t deleted = std::count_if(map.begin(), map.end(), isDeletedVector); o << map.size() << " " << name << " (" << deleted << " deleted), "; return o.str(); } @@ -2639,6 +2671,15 @@ string dumpMap(const ManagementObjectMap& map) { return o.str(); } +string dumpVector(const ManagementObjectVector& map) { + ostringstream o; + for (ManagementObjectVector::const_iterator i = map.begin(); i != map.end(); ++i) { + o << endl << " " << (*i)->getObjectId().getV2Key() + << ((*i)->isDeleted() ? " (deleted)" : ""); + } + return o.str(); +} + } // namespace string ManagementAgent::summarizeAgents() { @@ -2658,14 +2699,14 @@ void ManagementAgent::debugSnapshot(const char* title) { QPID_LOG(debug, title << ": management snapshot: " << packages.size() << " packages, " << summarizeMap("objects", managementObjects) - << summarizeMap("new objects ", newManagementObjects) + << summarizeVector("new objects ", newManagementObjects) << pendingDeletedObjs.size() << " pending deletes" << summarizeAgents()); QPID_LOG_IF(trace, managementObjects.size(), title << ": objects" << dumpMap(managementObjects)); QPID_LOG_IF(trace, newManagementObjects.size(), - title << ": new objects" << dumpMap(newManagementObjects)); + title << ": new objects" << dumpVector(newManagementObjects)); } Variant::Map ManagementAgent::toMap(const FieldTable& from) @@ -2910,6 +2951,45 @@ void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList) } +// construct a DeletedObject from a management object. +ManagementAgent::DeletedObject::DeletedObject(ManagementObject *src, bool v1, bool v2) + : packageName(src->getPackageName()), + className(src->getClassName()) +{ + bool send_stats = (src->hasInst() && (src->getInstChanged() || src->getForcePublish())); + + stringstream oid; + oid << src->getObjectId(); + objectId = oid.str(); + + if (v1) { + src->writeProperties(encodedV1Config); + if (send_stats) { + src->writeStatistics(encodedV1Inst); + } + } + + if (v2) { + Variant::Map map_; + Variant::Map values; + Variant::Map oid; + + src->getObjectId().mapEncode(oid); + map_["_object_id"] = oid; + map_["_schema_id"] = mapEncodeSchemaId(src->getPackageName(), + src->getClassName(), + "_data", + src->getMd5Sum()); + src->writeTimestamps(map_); + src->mapEncodeValues(values, true, send_stats); + map_["_values"] = values; + + encodedV2 = map_; + } +} + + + // construct a DeletedObject from an encoded representation. Used by // clustering to move deleted objects between clustered brokers. See // DeletedObject::encode() for the reverse. @@ -2966,42 +3046,9 @@ bool ManagementAgent::moveDeletedObjectsLH() { { ManagementObject* delObj = iter->second; assert(delObj->isDeleted()); - DeletedObject::shared_ptr dptr(new DeletedObject()); - std::string classkey(delObj->getPackageName() + std::string(":") + delObj->getClassName()); - bool send_stats = (delObj->hasInst() && (delObj->getInstChanged() || delObj->getForcePublish())); - - dptr->packageName = delObj->getPackageName(); - dptr->className = delObj->getClassName(); - stringstream oid; - oid << delObj->getObjectId(); - dptr->objectId = oid.str(); - - if (qmf1Support) { - delObj->writeProperties(dptr->encodedV1Config); - if (send_stats) { - delObj->writeStatistics(dptr->encodedV1Inst); - } - } - - if (qmf2Support) { - Variant::Map map_; - Variant::Map values; - Variant::Map oid; - - delObj->getObjectId().mapEncode(oid); - map_["_object_id"] = oid; - map_["_schema_id"] = mapEncodeSchemaId(delObj->getPackageName(), - delObj->getClassName(), - "_data", - delObj->getMd5Sum()); - delObj->writeTimestamps(map_); - delObj->mapEncodeValues(values, true, send_stats); - map_["_values"] = values; - - dptr->encodedV2 = map_; - } + DeletedObject::shared_ptr dptr(new DeletedObject(delObj, qmf1Support, qmf2Support)); - pendingDeletedObjs[classkey].push_back(dptr); + pendingDeletedObjs[dptr->getKey()].push_back(dptr); managementObjects.erase(iter->first); delete iter->second; } diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index 87c39a67bd..2202e2fc98 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -159,13 +159,17 @@ public: class DeletedObject { public: typedef boost::shared_ptr<DeletedObject> shared_ptr; - DeletedObject() {}; + DeletedObject(ManagementObject *, bool v1, bool v2); DeletedObject( const std::string &encoded ); ~DeletedObject() {}; void encode( std::string& toBuffer ); + const std::string getKey() const { + // used to batch up objects of the same class type + return std::string(packageName + std::string(":") + className); + } private: - friend class ManagementAgent; + friend class ManagementAgent; std::string packageName; std::string className; @@ -280,7 +284,7 @@ private: // // Protected by addLock // - ManagementObjectMap newManagementObjects; + ManagementObjectVector newManagementObjects; framing::Uuid uuid; diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index cfdd58ed53..b4d469afbe 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -187,10 +187,6 @@ void ObjectId::setV2Key(const ManagementObject& object) v2Key = oname.str(); } -void ObjectId::disambiguate() -{ - v2Key = v2Key + "_"; -} // encode as V2-format map void ObjectId::mapEncode(types::Variant::Map& map) const |
