diff options
| author | Ted Ross <tross@apache.org> | 2008-08-01 18:36:25 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2008-08-01 18:36:25 +0000 |
| commit | b272cc92fa7a4bcd49fb8da50b93bfb4d015fda7 (patch) | |
| tree | 1fbdc79f022c79ed5972fdf4b8994ce092f3b181 /cpp/src/qpid/management/ManagementBroker.cpp | |
| parent | 978db5706dc7930325362fc662c2ae6941b1faee (diff) | |
| download | qpid-python-b272cc92fa7a4bcd49fb8da50b93bfb4d015fda7.tar.gz | |
QPID-1174 - Clean up agent objects when the remote agent disconnects
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@681773 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management/ManagementBroker.cpp')
| -rw-r--r-- | cpp/src/qpid/management/ManagementBroker.cpp | 70 |
1 files changed, 51 insertions, 19 deletions
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp index 15263b5f2a..ec8e6fe436 100644 --- a/cpp/src/qpid/management/ManagementBroker.cpp +++ b/cpp/src/qpid/management/ManagementBroker.cpp @@ -26,6 +26,7 @@ #include <qpid/broker/MessageDelivery.h> #include "qpid/framing/MessageTransferBody.h" #include "qpid/sys/Time.h" +#include "qpid/broker/ConnectionState.h" #include <list> #include <iostream> #include <fstream> @@ -375,7 +376,10 @@ void ManagementBroker::PeriodicProcessing (void) iter++) managementObjects.erase (*iter); - deleteList.clear (); + if (!deleteList.empty()) { + deleteList.clear(); + deleteOrphanedAgentsLH(); + } } void ManagementBroker::sendCommandComplete (string replyToKey, uint32_t sequence, @@ -664,44 +668,72 @@ uint32_t ManagementBroker::assignBankLH (uint32_t requestedBank) return requestedBank; } -void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence) +void ManagementBroker::deleteOrphanedAgentsLH() +{ + vector<uint64_t> deleteList; + + for (RemoteAgentMap::iterator aIter = remoteAgents.begin(); aIter != remoteAgents.end(); aIter++) { + uint64_t connectionRef = aIter->first; + bool found = false; + + for (ManagementObjectMap::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) { + if (iter->first == connectionRef && !iter->second->isDeleted()) { + found = true; + break; + } + } + + if (!found) { + deleteList.push_back(connectionRef); + delete aIter->second; + } + } + + for (vector<uint64_t>::iterator dIter = deleteList.begin(); dIter != deleteList.end(); dIter++) { + + remoteAgents.erase(*dIter); + } + + deleteList.clear(); +} + +void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKey, uint32_t sequence, const ConnectionToken* connToken) { string label; uint32_t requestedBank; uint32_t assignedBank; - string sessionName; + uint64_t connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); Uuid systemId; - inBuffer.getShortString (label); - inBuffer.getShortString (sessionName); - systemId.decode (inBuffer); - requestedBank = inBuffer.getLong (); - assignedBank = assignBankLH (requestedBank); - - // TODO: Make a pass over the agents and delete any that no longer have a session. - - RemoteAgentMap::iterator aIter = remoteAgents.find (sessionName); - if (aIter != remoteAgents.end()) - { + moveNewObjectsLH(); + deleteOrphanedAgentsLH(); + RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef); + if (aIter != remoteAgents.end()) { // There already exists an agent on this session. Reject the request. - sendCommandComplete (replyToKey, sequence, 1, "Session already has remote agent"); + sendCommandComplete (replyToKey, sequence, 1, "Connection already has remote agent"); return; } - // TODO: Reject requests for which the session name does not match an existing session. + inBuffer.getShortString (label); + systemId.decode (inBuffer); + requestedBank = inBuffer.getLong (); + assignedBank = assignBankLH (requestedBank); RemoteAgent* agent = new RemoteAgent; agent->objIdBank = assignedBank; agent->routingKey = replyToKey; + agent->connectionRef = connectionRef; agent->mgmtObject = new management::Agent (this, agent); - agent->mgmtObject->set_sessionName (sessionName); + agent->mgmtObject->set_connectionRef(agent->connectionRef); agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); agent->mgmtObject->set_systemId (systemId); agent->mgmtObject->set_objectIdBank (assignedBank); addObject (agent->mgmtObject); - remoteAgents[sessionName] = agent; + remoteAgents[connectionRef] = agent; // Send an Attach Response Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); @@ -788,7 +820,7 @@ void ManagementBroker::dispatchAgentCommandLH (Message& msg) else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence); else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence); else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); - else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence); + else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence); } |
