diff options
| author | Ted Ross <tross@apache.org> | 2009-09-29 03:21:49 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2009-09-29 03:21:49 +0000 |
| commit | 7661c82fc7aaca543582ef45582d87de3c5de5b7 (patch) | |
| tree | 9de25825187c0a45df5880ce74e58befb6c4ec50 /cpp/src/qmf/engine/BrokerProxyImpl.cpp | |
| parent | 576b578d61d0d31082587bf77a25a59da2ba738f (diff) | |
| download | qpid-python-7661c82fc7aaca543582ef45582d87de3c5de5b7.tar.gz | |
QMF Engine updates:
- Connected console handler callbacks
- Added string representations for a number of object classes
- Added a feature that completes query requests sent to disconnected agents
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@819819 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/engine/BrokerProxyImpl.cpp')
| -rw-r--r-- | cpp/src/qmf/engine/BrokerProxyImpl.cpp | 109 |
1 files changed, 83 insertions, 26 deletions
diff --git a/cpp/src/qmf/engine/BrokerProxyImpl.cpp b/cpp/src/qmf/engine/BrokerProxyImpl.cpp index 36d3ffe361..e296254bf8 100644 --- a/cpp/src/qmf/engine/BrokerProxyImpl.cpp +++ b/cpp/src/qmf/engine/BrokerProxyImpl.cpp @@ -23,6 +23,7 @@ #include "qpid/Address.h" #include "qpid/sys/SystemInfo.h" #include <qpid/log/Statement.h> +#include <qpid/StringUtils.h> #include <string.h> #include <iostream> #include <fstream> @@ -109,18 +110,23 @@ void BrokerProxyImpl::sessionClosed() void BrokerProxyImpl::startProtocol() { - Mutex::ScopedLock _lock(lock); - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); + AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker")); + { + Mutex::ScopedLock _lock(lock); + char rawbuffer[512]; + Buffer buffer(rawbuffer, 512); - agentList[0] = AgentProxyPtr(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker")); + agentList[0] = agent; - requestsOutstanding = 1; - topicBound = false; - uint32_t sequence(seqMgr.reserve()); - Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence); - sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence); + requestsOutstanding = 1; + topicBound = false; + uint32_t sequence(seqMgr.reserve()); + Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence); + sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); + QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence); + } + + console.impl->eventAgentAdded(agent); } void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) @@ -145,7 +151,7 @@ void BrokerProxyImpl::handleRcvMessage(Message& message) uint32_t sequence; while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) - seqMgr.dispatch(opcode, sequence, inBuffer); + seqMgr.dispatch(opcode, sequence, message.routingKey ? string(message.routingKey) : string(), inBuffer); } bool BrokerProxyImpl::getXmtMessage(Message& item) const @@ -216,6 +222,7 @@ void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const stringstream key; Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t sequence(seqMgr.reserve(queryContext)); + agent->impl->addSequence(sequence); Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence); query.impl->encode(outBuffer); @@ -406,9 +413,23 @@ MethodResponsePtr BrokerProxyImpl::handleMethodResponse(Buffer& inBuffer, uint32 return response; } -void BrokerProxyImpl::handleHeartbeatIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/) +void BrokerProxyImpl::handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq, const string& routingKey) { - // TODO + vector<string> tokens = qpid::split(routingKey, "."); + uint32_t agentBank; + uint64_t timestamp; + + if (routingKey.empty() || tokens.size() != 4) + agentBank = 0; + else + agentBank = ::atoi(tokens[3].c_str()); + + timestamp = inBuffer.getLongLong(); + map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank); + if (iter != agentList.end()) { + console.impl->eventAgentHeartbeat(iter->second, timestamp); + } + QPID_LOG(trace, "RCVD HeartbeatIndication seq=" << seq << " agentBank=" << agentBank); } void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/) @@ -481,11 +502,24 @@ ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq void BrokerProxyImpl::updateAgentList(ObjectPtr obj) { Value* value = obj->getValue("agentBank"); + Mutex::ScopedLock _lock(lock); if (value != 0 && value->isUint()) { uint32_t agentBank = value->asUint(); if (obj->isDeleted()) { - agentList.erase(agentBank); - QPID_LOG(trace, "Agent at bank " << agentBank << " removed from agent list"); + map<uint32_t, AgentProxyPtr>::iterator iter = agentList.find(agentBank); + if (iter != agentList.end()) { + AgentProxyPtr agent(iter->second); + console.impl->eventAgentDeleted(agent); + agentList.erase(agentBank); + QPID_LOG(trace, "Agent at bank " << agentBank << " removed from agent list"); + + // + // Release all sequence numbers for requests in-flight to this agent. + // Since the agent is no longer connected, these requests would not + // otherwise complete. + // + agent->impl->releaseInFlight(seqMgr); + } } else { Value* str = obj->getValue("label"); string label; @@ -493,7 +527,9 @@ void BrokerProxyImpl::updateAgentList(ObjectPtr obj) label = str->asString(); map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank); if (iter == agentList.end()) { - agentList[agentBank] = AgentProxyPtr(AgentProxyImpl::factory(console, publicObject, agentBank, label)); + AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, agentBank, label)); + agentList[agentBank] = agent; + console.impl->eventAgentAdded(agent); QPID_LOG(trace, "Agent '" << label << "' found at bank " << agentBank); } } @@ -572,9 +608,11 @@ MethodResponse* MethodResponseImpl::factory(uint32_t status, const std::string& return new MethodResponse(impl); } -bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer) +bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& routingKey, Buffer& buffer) { + ObjectPtr object; bool completeContext = false; + if (opcode == Protocol::OP_BROKER_RESPONSE) { broker.handleBrokerResponse(buffer, sequence); completeContext = true; @@ -592,15 +630,21 @@ bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buf else if (opcode == Protocol::OP_CLASS_INDICATION) broker.handleClassIndication(buffer, sequence); else if (opcode == Protocol::OP_HEARTBEAT_INDICATION) - broker.handleHeartbeatIndication(buffer, sequence); + broker.handleHeartbeatIndication(buffer, sequence, routingKey); else if (opcode == Protocol::OP_EVENT_INDICATION) broker.handleEventIndication(buffer, sequence); - else if (opcode == Protocol::OP_PROPERTY_INDICATION) - broker.handleObjectIndication(buffer, sequence, true, false); - else if (opcode == Protocol::OP_STATISTIC_INDICATION) - broker.handleObjectIndication(buffer, sequence, false, true); - else if (opcode == Protocol::OP_OBJECT_INDICATION) - broker.handleObjectIndication(buffer, sequence, true, true); + else if (opcode == Protocol::OP_PROPERTY_INDICATION) { + object = broker.handleObjectIndication(buffer, sequence, true, false); + broker.console.impl->eventObjectUpdate(object, true, false); + } + else if (opcode == Protocol::OP_STATISTIC_INDICATION) { + object = broker.handleObjectIndication(buffer, sequence, false, true); + broker.console.impl->eventObjectUpdate(object, false, true); + } + else if (opcode == Protocol::OP_OBJECT_INDICATION) { + object = broker.handleObjectIndication(buffer, sequence, true, true); + broker.console.impl->eventObjectUpdate(object, true, true); + } else { QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode); completeContext = true; @@ -627,7 +671,7 @@ void QueryContext::release() broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse)); } -bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer) +bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer) { bool completeContext = false; ObjectPtr object; @@ -635,6 +679,19 @@ bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buff if (opcode == Protocol::OP_COMMAND_COMPLETE) { broker.handleCommandComplete(buffer, sequence); completeContext = true; + + // + // Visit each agent and remove the sequence from that agent's in-flight list. + // This could be made more efficient because only one agent will have this sequence + // in its list. + // + map<uint32_t, AgentProxyPtr> copy; + { + Mutex::ScopedLock _block(broker.lock); + copy = broker.agentList; + } + for (map<uint32_t, AgentProxyPtr>::iterator iter = copy.begin(); iter != copy.end(); iter++) + iter->second->impl->delSequence(sequence); } else if (opcode == Protocol::OP_OBJECT_INDICATION) { object = broker.handleObjectIndication(buffer, sequence, true, true); @@ -655,7 +712,7 @@ void MethodContext::release() broker.eventQueue.push_back(broker.eventMethodResponse(userContext, methodResponse)); } -bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer) +bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer) { if (opcode == Protocol::OP_METHOD_RESPONSE) methodResponse = broker.handleMethodResponse(buffer, sequence, schema); |
