summaryrefslogtreecommitdiff
path: root/cpp/src/qmf/engine/BrokerProxyImpl.cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-09-29 03:21:49 +0000
committerTed Ross <tross@apache.org>2009-09-29 03:21:49 +0000
commit7661c82fc7aaca543582ef45582d87de3c5de5b7 (patch)
tree9de25825187c0a45df5880ce74e58befb6c4ec50 /cpp/src/qmf/engine/BrokerProxyImpl.cpp
parent576b578d61d0d31082587bf77a25a59da2ba738f (diff)
downloadqpid-python-7661c82fc7aaca543582ef45582d87de3c5de5b7.tar.gz
QMF Engine updates:
- Connected console handler callbacks - Added string representations for a number of object classes - Added a feature that completes query requests sent to disconnected agents git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@819819 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qmf/engine/BrokerProxyImpl.cpp')
-rw-r--r--cpp/src/qmf/engine/BrokerProxyImpl.cpp109
1 files changed, 83 insertions, 26 deletions
diff --git a/cpp/src/qmf/engine/BrokerProxyImpl.cpp b/cpp/src/qmf/engine/BrokerProxyImpl.cpp
index 36d3ffe361..e296254bf8 100644
--- a/cpp/src/qmf/engine/BrokerProxyImpl.cpp
+++ b/cpp/src/qmf/engine/BrokerProxyImpl.cpp
@@ -23,6 +23,7 @@
#include "qpid/Address.h"
#include "qpid/sys/SystemInfo.h"
#include <qpid/log/Statement.h>
+#include <qpid/StringUtils.h>
#include <string.h>
#include <iostream>
#include <fstream>
@@ -109,18 +110,23 @@ void BrokerProxyImpl::sessionClosed()
void BrokerProxyImpl::startProtocol()
{
- Mutex::ScopedLock _lock(lock);
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
+ AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker"));
+ {
+ Mutex::ScopedLock _lock(lock);
+ char rawbuffer[512];
+ Buffer buffer(rawbuffer, 512);
- agentList[0] = AgentProxyPtr(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker"));
+ agentList[0] = agent;
- requestsOutstanding = 1;
- topicBound = false;
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
+ requestsOutstanding = 1;
+ topicBound = false;
+ uint32_t sequence(seqMgr.reserve());
+ Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
+ sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
+ }
+
+ console.impl->eventAgentAdded(agent);
}
void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
@@ -145,7 +151,7 @@ void BrokerProxyImpl::handleRcvMessage(Message& message)
uint32_t sequence;
while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
- seqMgr.dispatch(opcode, sequence, inBuffer);
+ seqMgr.dispatch(opcode, sequence, message.routingKey ? string(message.routingKey) : string(), inBuffer);
}
bool BrokerProxyImpl::getXmtMessage(Message& item) const
@@ -216,6 +222,7 @@ void BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const
stringstream key;
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t sequence(seqMgr.reserve(queryContext));
+ agent->impl->addSequence(sequence);
Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
query.impl->encode(outBuffer);
@@ -406,9 +413,23 @@ MethodResponsePtr BrokerProxyImpl::handleMethodResponse(Buffer& inBuffer, uint32
return response;
}
-void BrokerProxyImpl::handleHeartbeatIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
+void BrokerProxyImpl::handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq, const string& routingKey)
{
- // TODO
+ vector<string> tokens = qpid::split(routingKey, ".");
+ uint32_t agentBank;
+ uint64_t timestamp;
+
+ if (routingKey.empty() || tokens.size() != 4)
+ agentBank = 0;
+ else
+ agentBank = ::atoi(tokens[3].c_str());
+
+ timestamp = inBuffer.getLongLong();
+ map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank);
+ if (iter != agentList.end()) {
+ console.impl->eventAgentHeartbeat(iter->second, timestamp);
+ }
+ QPID_LOG(trace, "RCVD HeartbeatIndication seq=" << seq << " agentBank=" << agentBank);
}
void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq*/)
@@ -481,11 +502,24 @@ ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq
void BrokerProxyImpl::updateAgentList(ObjectPtr obj)
{
Value* value = obj->getValue("agentBank");
+ Mutex::ScopedLock _lock(lock);
if (value != 0 && value->isUint()) {
uint32_t agentBank = value->asUint();
if (obj->isDeleted()) {
- agentList.erase(agentBank);
- QPID_LOG(trace, "Agent at bank " << agentBank << " removed from agent list");
+ map<uint32_t, AgentProxyPtr>::iterator iter = agentList.find(agentBank);
+ if (iter != agentList.end()) {
+ AgentProxyPtr agent(iter->second);
+ console.impl->eventAgentDeleted(agent);
+ agentList.erase(agentBank);
+ QPID_LOG(trace, "Agent at bank " << agentBank << " removed from agent list");
+
+ //
+ // Release all sequence numbers for requests in-flight to this agent.
+ // Since the agent is no longer connected, these requests would not
+ // otherwise complete.
+ //
+ agent->impl->releaseInFlight(seqMgr);
+ }
} else {
Value* str = obj->getValue("label");
string label;
@@ -493,7 +527,9 @@ void BrokerProxyImpl::updateAgentList(ObjectPtr obj)
label = str->asString();
map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank);
if (iter == agentList.end()) {
- agentList[agentBank] = AgentProxyPtr(AgentProxyImpl::factory(console, publicObject, agentBank, label));
+ AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, agentBank, label));
+ agentList[agentBank] = agent;
+ console.impl->eventAgentAdded(agent);
QPID_LOG(trace, "Agent '" << label << "' found at bank " << agentBank);
}
}
@@ -572,9 +608,11 @@ MethodResponse* MethodResponseImpl::factory(uint32_t status, const std::string&
return new MethodResponse(impl);
}
-bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& routingKey, Buffer& buffer)
{
+ ObjectPtr object;
bool completeContext = false;
+
if (opcode == Protocol::OP_BROKER_RESPONSE) {
broker.handleBrokerResponse(buffer, sequence);
completeContext = true;
@@ -592,15 +630,21 @@ bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buf
else if (opcode == Protocol::OP_CLASS_INDICATION)
broker.handleClassIndication(buffer, sequence);
else if (opcode == Protocol::OP_HEARTBEAT_INDICATION)
- broker.handleHeartbeatIndication(buffer, sequence);
+ broker.handleHeartbeatIndication(buffer, sequence, routingKey);
else if (opcode == Protocol::OP_EVENT_INDICATION)
broker.handleEventIndication(buffer, sequence);
- else if (opcode == Protocol::OP_PROPERTY_INDICATION)
- broker.handleObjectIndication(buffer, sequence, true, false);
- else if (opcode == Protocol::OP_STATISTIC_INDICATION)
- broker.handleObjectIndication(buffer, sequence, false, true);
- else if (opcode == Protocol::OP_OBJECT_INDICATION)
- broker.handleObjectIndication(buffer, sequence, true, true);
+ else if (opcode == Protocol::OP_PROPERTY_INDICATION) {
+ object = broker.handleObjectIndication(buffer, sequence, true, false);
+ broker.console.impl->eventObjectUpdate(object, true, false);
+ }
+ else if (opcode == Protocol::OP_STATISTIC_INDICATION) {
+ object = broker.handleObjectIndication(buffer, sequence, false, true);
+ broker.console.impl->eventObjectUpdate(object, false, true);
+ }
+ else if (opcode == Protocol::OP_OBJECT_INDICATION) {
+ object = broker.handleObjectIndication(buffer, sequence, true, true);
+ broker.console.impl->eventObjectUpdate(object, true, true);
+ }
else {
QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode);
completeContext = true;
@@ -627,7 +671,7 @@ void QueryContext::release()
broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse));
}
-bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer)
{
bool completeContext = false;
ObjectPtr object;
@@ -635,6 +679,19 @@ bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buff
if (opcode == Protocol::OP_COMMAND_COMPLETE) {
broker.handleCommandComplete(buffer, sequence);
completeContext = true;
+
+ //
+ // Visit each agent and remove the sequence from that agent's in-flight list.
+ // This could be made more efficient because only one agent will have this sequence
+ // in its list.
+ //
+ map<uint32_t, AgentProxyPtr> copy;
+ {
+ Mutex::ScopedLock _block(broker.lock);
+ copy = broker.agentList;
+ }
+ for (map<uint32_t, AgentProxyPtr>::iterator iter = copy.begin(); iter != copy.end(); iter++)
+ iter->second->impl->delSequence(sequence);
}
else if (opcode == Protocol::OP_OBJECT_INDICATION) {
object = broker.handleObjectIndication(buffer, sequence, true, true);
@@ -655,7 +712,7 @@ void MethodContext::release()
broker.eventQueue.push_back(broker.eventMethodResponse(userContext, methodResponse));
}
-bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, Buffer& buffer)
+bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer)
{
if (opcode == Protocol::OP_METHOD_RESPONSE)
methodResponse = broker.handleMethodResponse(buffer, sequence, schema);