From 24fb6939e5420ecae9033687c8c6081a62cd42a5 Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Wed, 2 Feb 2011 18:11:07 +0000 Subject: QPID-3032 - Bug-fix: The broker management agent and the remote C++ agent hard-coded the assumed value of the "exchange" component of reply-to headers to either "amq.direct" or "qmf.default.direct", depending on the circumstance. This commit fixes this such that message replies are sent to the exchange/key pair supplied in the reply-to header. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1066557 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/agent/ManagementAgentImpl.cpp | 72 ++++++++------- cpp/src/qpid/agent/ManagementAgentImpl.h | 19 ++-- cpp/src/qpid/management/ManagementAgent.cpp | 137 +++++++++++++++++----------- cpp/src/qpid/management/ManagementAgent.h | 23 +++-- 4 files changed, 146 insertions(+), 105 deletions(-) (limited to 'cpp/src/qpid') diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 35011db38e..593d403a11 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -362,7 +362,7 @@ uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) methodQueue.pop_front(); { sys::Mutex::ScopedUnlock unlock(agentLock); - invokeMethodRequest(item->body, item->cid, item->replyTo, item->userId); + invokeMethodRequest(item->body, item->cid, item->replyToExchange, item->replyToKey, item->userId); delete item; } } @@ -497,7 +497,7 @@ void ManagementAgentImpl::sendHeartbeat() QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); } -void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid, +void ManagementAgentImpl::sendException(const string& rte, const string& rtk, const string& cid, const string& text, uint32_t code) { Variant::Map map; @@ -514,12 +514,12 @@ void ManagementAgentImpl::sendException(const string& replyToKey, const string& map["_values"] = values; MapCodec::encode(map, content); - connThreadBody.sendBuffer(content, cid, headers, directExchange, replyToKey); + connThreadBody.sendBuffer(content, cid, headers, rte, rtk); QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text); } -void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo) +void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& rte, const string& rtk) { sys::Mutex::ScopedLock lock(agentLock); string packageName; @@ -546,7 +546,7 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc outBuffer.putRawData(body); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - connThreadBody.sendBuffer(outBuffer, outLen, "amq.direct", replyTo); + connThreadBody.sendBuffer(outBuffer, outLen, rte, rtk); QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name); } @@ -561,7 +561,7 @@ void ManagementAgentImpl::handleConsoleAddedIndication() QPID_LOG(trace, "RCVD ConsoleAddedInd"); } -void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& replyTo, const string& userId) +void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& rte, const string& rtk, const string& userId) { string methodName; bool failed = false; @@ -572,11 +572,9 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& MapCodec::decode(body, inMap); - outMap["_values"] = Variant::Map(); - if ((oid = inMap.find("_object_id")) == inMap.end() || (mid = inMap.find("_method_name")) == inMap.end()) { - sendException(replyTo, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID), + sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID), Manageable::STATUS_PARAMETER_INVALID); failed = true; } else { @@ -595,6 +593,8 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& inArgs = (mid->second).asMap(); } + QPID_LOG(trace, "Invoking Method: name=" << methodName << " args=" << inArgs); + boost::shared_ptr oPtr; { sys::Mutex::ScopedLock lock(agentLock); @@ -604,7 +604,7 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& } if (oPtr.get() == 0) { - sendException(replyTo, cid, Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT), + sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT), Manageable::STATUS_UNKNOWN_OBJECT); failed = true; } else { @@ -617,13 +617,13 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& if (iter->first != "_status_code" && iter->first != "_status_text") outMap["_arguments"].asMap()[iter->first] = iter->second; } else { - sendException(replyTo, cid, callMap["_status_text"], callMap["_status_code"]); + sendException(rte, rtk, cid, callMap["_status_text"], callMap["_status_code"]); failed = true; } } } catch(types::InvalidConversion& e) { - sendException(replyTo, cid, e.what(), Manageable::STATUS_EXCEPTION); + sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION); failed = true; } } @@ -635,11 +635,11 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& headers["qmf.opcode"] = "_method_response"; QPID_LOG(trace, "SENT MethodResponse map=" << outMap); MapCodec::encode(outMap, content); - connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo); + connThreadBody.sendBuffer(content, cid, headers, rte, rtk); } } -void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& replyTo) +void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& rte, const string& rtk) { moveNewObjectsLH(); @@ -666,12 +666,12 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, */ i = inMap.find("_what"); if (i == inMap.end()) { - sendException(replyTo, cid, "_what element missing in Query"); + sendException(rte, rtk, cid, "_what element missing in Query"); return; } if (i->second.getType() != qpid::types::VAR_STRING) { - sendException(replyTo, cid, "_what element is not a string"); + sendException(rte, rtk, cid, "_what element is not a string"); return; } @@ -709,8 +709,8 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, headers.erase("partial"); ListCodec::encode(list_, content); - connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list"); - QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo); + connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk); return; } } else { // match using schema_id, if supplied @@ -771,8 +771,8 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, if (++objCount >= maxV2ReplyObjs) { objCount = 0; ListCodec::encode(list_, content); - connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list"); - QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo); + connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk); content.clear(); list_.clear(); } @@ -784,8 +784,8 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, // Send last "non-partial" message to indicate CommandComplete headers.erase("partial"); ListCodec::encode(list_, content); - connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list"); - QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo); + connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (last message, no 'partial' indicator) to=" << rte << "/" << rtk); } else if (i->second.asString() == "SCHEMA_ID") { headers["qmf.content"] = "_schema_id"; @@ -806,16 +806,16 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, headers.erase("partial"); ListCodec::encode(list_, content); - connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list"); - QPID_LOG(trace, "SENT QueryResponse (SchemaId) to=" << replyTo); + connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (SchemaId) to=" << rte << "/" << rtk); } else { // Unknown query target - sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported"); + sendException(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported"); } } -void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo) +void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& rte, const string& rtk) { QPID_LOG(trace, "RCVD AgentLocateRequest"); @@ -829,9 +829,9 @@ void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, getHeartbeatContent(map); MapCodec::encode(map, content); - connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo); + connThreadBody.sendBuffer(content, cid, headers, rte, rtk); - QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo); + QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk); { sys::Mutex::ScopedLock lock(agentLock); @@ -839,12 +839,12 @@ void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, } } -void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& replyTo, const string& userId) +void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& rte, const string& rtk, const string& userId) { if (extThread) { sys::Mutex::ScopedLock lock(agentLock); - methodQueue.push_back(new QueuedMethod(cid, replyTo, body, userId)); + methodQueue.push_back(new QueuedMethod(cid, rte, rtk, body, userId)); if (pipeHandle != 0) { pipeHandle->write("X", 1); } else if (notifyable != 0) { @@ -863,7 +863,7 @@ void ManagementAgentImpl::handleMethodRequest(const string& body, const string& inCallback = false; } } else { - invokeMethodRequest(body, cid, replyTo, userId); + invokeMethodRequest(body, cid, rte, rtk, userId); } QPID_LOG(trace, "RCVD MethodRequest"); @@ -871,10 +871,12 @@ void ManagementAgentImpl::handleMethodRequest(const string& body, const string& void ManagementAgentImpl::received(Message& msg) { + string replyToExchange; string replyToKey; framing::MessageProperties mp = msg.getMessageProperties(); if (mp.hasReplyTo()) { const framing::ReplyTo& rt = mp.getReplyTo(); + replyToExchange = rt.getExchange(); replyToKey = rt.getRoutingKey(); } @@ -887,9 +889,9 @@ void ManagementAgentImpl::received(Message& msg) string opcode = mp.getApplicationHeaders().getAsString("qmf.opcode"); string cid = msg.getMessageProperties().getCorrelationId(); - if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToKey); - else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToKey, userId); - else if (opcode == "_query_request") handleGetQuery(msg.getData(), cid, replyToKey); + if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToExchange, replyToKey); + else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToExchange, replyToKey, userId); + else if (opcode == "_query_request") handleGetQuery(msg.getData(), cid, replyToExchange, replyToKey); else { QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!"); } @@ -906,7 +908,7 @@ void ManagementAgentImpl::received(Message& msg) if (checkHeader(inBuffer, &opcode, &sequence)) { - if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToKey); + if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey); else if (opcode == 'x') handleConsoleAddedIndication(); else QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode)); diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h index 09d98d237b..bf340777d1 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -128,11 +128,12 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen }; struct QueuedMethod { - QueuedMethod(const std::string& _cid, const std::string& _reply, const std::string& _body, const std::string& _uid) : - cid(_cid), replyTo(_reply), body(_body), userId(_uid) {} + QueuedMethod(const std::string& _cid, const std::string& _rte, const std::string& _rtk, const std::string& _body, const std::string& _uid) : + cid(_cid), replyToExchange(_rte), replyToKey(_rtk), body(_body), userId(_uid) {} std::string cid; - std::string replyTo; + std::string replyToExchange; + std::string replyToKey; std::string body; std::string userId; }; @@ -278,16 +279,16 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen uint8_t type=ManagementItem::CLASS_KIND_TABLE); bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); void sendHeartbeat(); - void sendException(const std::string& replyToKey, const std::string& cid, + void sendException(const std::string& replyToExchange, const std::string& replyToKey, const std::string& cid, const std::string& text, uint32_t code=1); void handlePackageRequest (qpid::framing::Buffer& inBuffer); void handleClassQuery (qpid::framing::Buffer& inBuffer); - void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& replyTo); - void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& replyTo, const std::string& userId); + void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& rte, const std::string& rtk); + void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& rte, const std::string& rtk, const std::string& userId); - void handleGetQuery (const std::string& body, const std::string& cid, const std::string& replyTo); - void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& replyTo); - void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& replyTo, const std::string& userId); + void handleGetQuery (const std::string& body, const std::string& cid, const std::string& rte, const std::string& rtk); + void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& rte, const std::string& rtk); + void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& rte, const std::string& rtk, const std::string& userId); void handleConsoleAddedIndication(); void getHeartbeatContent (qpid::types::Variant::Map& map); }; diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 0fb23bdb7d..23c999a98a 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -503,7 +503,7 @@ bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) void ManagementAgent::sendBufferLH(Buffer& buf, uint32_t length, qpid::broker::Exchange::shared_ptr exchange, - string routingKey) + const string& routingKey) { if (suppressed) { QPID_LOG(debug, "Suppressing management message to " << routingKey); @@ -548,6 +548,17 @@ void ManagementAgent::sendBufferLH(Buffer& buf, } +void ManagementAgent::sendBufferLH(Buffer& buf, + uint32_t length, + const string& exchange, + const string& routingKey) +{ + qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange)); + if (ex.get() != 0) + sendBufferLH(buf, length, ex, routingKey); +} + + // NOTE WELL: assumes userLock is held by caller (LH) // NOTE EVEN WELLER: drops this lock when delivering the message!!! void ManagementAgent::sendBufferLH(const string& data, @@ -612,6 +623,20 @@ void ManagementAgent::sendBufferLH(const string& data, } +void ManagementAgent::sendBufferLH(const string& data, + const string& cid, + const Variant::Map& headers, + const string& content_type, + const string& exchange, + const string& routingKey, + uint64_t ttl_msec) +{ + qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange)); + if (ex.get() != 0) + sendBufferLH(data, cid, headers, content_type, ex, routingKey, ttl_msec); +} + + /** Objects that have been added since the last periodic poll are temporarily * saved in the newManagementObjects list. This allows objects to be * added without needing to block on the userLock (addLock is used instead). @@ -1106,7 +1131,7 @@ void ManagementAgent::sendCommandCompleteLH(const string& replyToKey, uint32_t s replyToKey << " seq=" << sequence); } -void ManagementAgent::sendExceptionLH(const string& replyToKey, const string& cid, +void ManagementAgent::sendExceptionLH(const string& rte, const string& rtk, const string& cid, const string& text, uint32_t code, bool viaLocal) { static const string addr_exchange("qmf.default.direct"); @@ -1125,7 +1150,7 @@ void ManagementAgent::sendExceptionLH(const string& replyToKey, const string& ci map["_values"] = values; MapCodec::encode(map, content); - sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyToKey); + sendBufferLH(content, cid, headers, "amqp/map", rte, rtk); QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text); } @@ -1295,7 +1320,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl } -void ManagementAgent::handleMethodRequestLH (const string& body, const string& replyTo, +void ManagementAgent::handleMethodRequestLH (const string& body, const string& rte, const string& rtk, const string& cid, const ConnectionToken* connToken, bool viaLocal) { moveNewObjectsLH(); @@ -1317,7 +1342,7 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r if ((oid = inMap.find("_object_id")) == inMap.end() || (mid = inMap.find("_method_name")) == inMap.end()) { - sendExceptionLH(replyTo, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID), + sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID), Manageable::STATUS_PARAMETER_INVALID, viaLocal); return; } @@ -1336,7 +1361,7 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r inArgs = (mid->second).asMap(); } } catch(exception& e) { - sendExceptionLH(replyTo, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); + sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); return; } @@ -1345,7 +1370,7 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r if (iter == managementObjects.end() || iter->second->isDeleted()) { stringstream estr; estr << "No object found with ID=" << objId; - sendExceptionLH(replyTo, cid, estr.str(), 1, viaLocal); + sendExceptionLH(rte, rtk, cid, estr.str(), 1, viaLocal); return; } @@ -1355,7 +1380,7 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r i = disallowed.find(make_pair(iter->second->getClassName(), methodName)); if (i != disallowed.end()) { - sendExceptionLH(replyTo, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal); + sendExceptionLH(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal); return; } @@ -1366,7 +1391,7 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r params[acl::PROP_SCHEMACLASS] = iter->second->getClassName(); if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { - sendExceptionLH(replyTo, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), + sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), Manageable::STATUS_FORBIDDEN, viaLocal); return; } @@ -1376,7 +1401,7 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r QPID_LOG(debug, "RECV MethodRequest (v2) class=" << iter->second->getPackageName() << ":" << iter->second->getClassName() << " method=" << - methodName << " replyTo=" << replyTo << " objId=" << objId << " inArgs=" << inArgs); + methodName << " replyTo=" << rte << "/" << rtk << " objId=" << objId << " inArgs=" << inArgs); try { sys::Mutex::ScopedUnlock u(userLock); @@ -1391,18 +1416,18 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r } else error = callMap["_status_text"].asString(); } catch(exception& e) { - sendExceptionLH(replyTo, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); + sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); return; } if (errorCode != 0) { - sendExceptionLH(replyTo, cid, error, errorCode, viaLocal); + sendExceptionLH(rte, rtk, cid, error, errorCode, viaLocal); return; } MapCodec::encode(outMap, content); - sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo); - QPID_LOG(debug, "SEND MethodResponse (v2) to=" << replyTo << " seq=" << cid << " map=" << outMap); + sendBufferLH(content, cid, headers, "amqp/map", rte, rtk); + QPID_LOG(debug, "SEND MethodResponse (v2) to=" << rte << "/" << rtk << " seq=" << cid << " map=" << outMap); } @@ -1549,7 +1574,7 @@ void ManagementAgent::SchemaClass::appendSchema(Buffer& buf) buf.putRawData(reinterpret_cast(&data[0]), data.size()); } -void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence) +void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& rte, const string& rtk, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -1558,7 +1583,7 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& repl key.decode(inBuffer); QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << - "), replyTo=" << replyToKey << " seq=" << sequence); + "), replyTo=" << rte << "/" << rtk << " seq=" << sequence); PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) { @@ -1574,17 +1599,17 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& repl classInfo.appendSchema(outBuffer); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND SchemaResponse to=" << replyToKey << " seq=" << sequence); + sendBufferLH(outBuffer, outLen, rte, rtk); + QPID_LOG(debug, "SEND SchemaResponse to=" << rte << "/" << rtk << " seq=" << sequence); } else - sendCommandCompleteLH(replyToKey, sequence, 1, "Schema not available"); + sendCommandCompleteLH(rtk, sequence, 1, "Schema not available"); } else - sendCommandCompleteLH(replyToKey, sequence, 1, "Class key not found"); + sendCommandCompleteLH(rtk, sequence, 1, "Class key not found"); } else - sendCommandCompleteLH(replyToKey, sequence, 1, "Package not found"); + sendCommandCompleteLH(rtk, sequence, 1, "Package not found"); } void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*replyToKey*/, uint32_t sequence) @@ -1844,7 +1869,7 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe } -void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo, const string& cid, bool viaLocal) +void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal) { moveNewObjectsLH(); @@ -1865,17 +1890,17 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo */ i = inMap.find("_what"); if (i == inMap.end()) { - sendExceptionLH(replyTo, cid, "_what element missing in Query"); + sendExceptionLH(rte, rtk, cid, "_what element missing in Query"); return; } if (i->second.getType() != qpid::types::VAR_STRING) { - sendExceptionLH(replyTo, cid, "_what element is not a string"); + sendExceptionLH(rte, rtk, cid, "_what element is not a string"); return; } if (i->second.asString() != "OBJECT") { - sendExceptionLH(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported"); + sendExceptionLH(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported"); return; } @@ -1934,8 +1959,8 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo string content; ListCodec::encode(list_, content); - sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); - QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << replyTo); + sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); + QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk); return; } } else { @@ -1987,27 +2012,26 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& replyTo string content; while (_list.size() > 1) { ListCodec::encode(_list.front().asList(), content); - sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); + sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); _list.pop_front(); - QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << replyTo << " len=" << content.length()); + QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length()); } headers.erase("partial"); ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content); - sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); - QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << replyTo << " len=" << content.length()); + sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); + QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length()); return; } // Unrecognized query - Send empty message to indicate CommandComplete string content; ListCodec::encode(Variant::List(), content); - sendBufferLH(content, cid, headers, "amqp/list", v2Direct, replyTo); - QPID_LOG(debug, "SENT QueryResponse (empty) to=" << replyTo); + sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); + QPID_LOG(debug, "SENT QueryResponse (empty) to=" << rte << "/" << rtk); } -void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo, - const string& cid) +void ManagementAgent::handleLocateRequestLH(const string&, const string& rte, const string& rtk, const string& cid) { QPID_LOG(debug, "RCVD AgentLocateRequest"); @@ -2025,10 +2049,10 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo string content; MapCodec::encode(map, content); - sendBufferLH(content, cid, headers, "amqp/map", v2Direct, replyTo); + sendBufferLH(content, cid, headers, "amqp/map", rte, rtk); clientWasAdded = true; - QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << replyTo); + QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk); } @@ -2155,13 +2179,14 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) msg.getFrames().getHeaders()->get(); if (p && p->hasReplyTo()) { const framing::ReplyTo& rt = p->getReplyTo(); - string replyToKey = rt.getRoutingKey(); + string rte = rt.getExchange(); + string rtk = rt.getRoutingKey(); string cid; if (p && p->hasCorrelationId()) cid = p->getCorrelationId(); if (mapMsg) { - sendExceptionLH(replyToKey, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), + sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), Manageable::STATUS_FORBIDDEN, false); } else { @@ -2173,7 +2198,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); outLen = MA_BUFFER_SIZE - outBuffer.available(); outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBufferLH(outBuffer, outLen, rte, rtk); } QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); @@ -2187,12 +2212,14 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal) { - string replyToKey; + string rte; + string rtk; const framing::MessageProperties* p = msg.getFrames().getHeaders()->get(); if (p && p->hasReplyTo()) { const framing::ReplyTo& rt = p->getReplyTo(); - replyToKey = rt.getRoutingKey(); + rte = rt.getExchange(); + rtk = rt.getRoutingKey(); } else return; @@ -2224,11 +2251,11 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal) } if (opcode == "_method_request") - return handleMethodRequestLH(body, replyToKey, cid, msg.getPublisher(), viaLocal); + return handleMethodRequestLH(body, rte, rtk, cid, msg.getPublisher(), viaLocal); else if (opcode == "_query_request") - return handleGetQueryLH(body, replyToKey, cid, viaLocal); + return handleGetQueryLH(body, rte, rtk, cid, viaLocal); else if (opcode == "_agent_locate_request") - return handleLocateRequestLH(body, replyToKey, cid); + return handleLocateRequestLH(body, rte, rtk, cid); QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!"); return; @@ -2241,16 +2268,16 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal) if (!checkHeader(inBuffer, &opcode, &sequence)) return; - if (opcode == 'B') handleBrokerRequestLH (inBuffer, replyToKey, sequence); - else if (opcode == 'P') handlePackageQueryLH (inBuffer, replyToKey, sequence); - else if (opcode == 'p') handlePackageIndLH (inBuffer, replyToKey, sequence); - else if (opcode == 'Q') handleClassQueryLH (inBuffer, replyToKey, sequence); - else if (opcode == 'q') handleClassIndLH (inBuffer, replyToKey, sequence); - else if (opcode == 'S') handleSchemaRequestLH (inBuffer, replyToKey, sequence); - else if (opcode == 's') handleSchemaResponseLH (inBuffer, replyToKey, sequence); - else if (opcode == 'A') handleAttachRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); - else if (opcode == 'G') handleGetQueryLH (inBuffer, replyToKey, sequence); - else if (opcode == 'M') handleMethodRequestLH (inBuffer, replyToKey, sequence, msg.getPublisher()); + if (opcode == 'B') handleBrokerRequestLH (inBuffer, rtk, sequence); + else if (opcode == 'P') handlePackageQueryLH (inBuffer, rtk, sequence); + else if (opcode == 'p') handlePackageIndLH (inBuffer, rtk, sequence); + else if (opcode == 'Q') handleClassQueryLH (inBuffer, rtk, sequence); + else if (opcode == 'q') handleClassIndLH (inBuffer, rtk, sequence); + else if (opcode == 'S') handleSchemaRequestLH (inBuffer, rte, rtk, sequence); + else if (opcode == 's') handleSchemaResponseLH (inBuffer, rtk, sequence); + else if (opcode == 'A') handleAttachRequestLH (inBuffer, rtk, sequence, msg.getPublisher()); + else if (opcode == 'G') handleGetQueryLH (inBuffer, rtk, sequence); + else if (opcode == 'M') handleMethodRequestLH (inBuffer, rtk, sequence, msg.getPublisher()); } } diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index d434fe44da..0db19594a7 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -354,7 +354,11 @@ private: void sendBufferLH(framing::Buffer& buf, uint32_t length, qpid::broker::Exchange::shared_ptr exchange, - std::string routingKey); + const std::string& routingKey); + void sendBufferLH(framing::Buffer& buf, + uint32_t length, + const std::string& exchange, + const std::string& routingKey); void sendBufferLH(const std::string& data, const std::string& cid, const qpid::types::Variant::Map& headers, @@ -362,6 +366,13 @@ private: qpid::broker::Exchange::shared_ptr exchange, const std::string& routingKey, uint64_t ttl_msec = 0); + void sendBufferLH(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map& headers, + const std::string& content_type, + const std::string& exchange, + const std::string& routingKey, + uint64_t ttl_msec = 0); void moveNewObjectsLH(); bool moveDeletedObjectsLH(); @@ -386,20 +397,20 @@ private: void deleteOrphanedAgentsLH(); void sendCommandCompleteLH(const std::string& replyToKey, uint32_t sequence, uint32_t code = 0, const std::string& text = "OK"); - void sendExceptionLH(const std::string& replyToKey, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false); + void sendExceptionLH(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false); void handleBrokerRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); void handlePackageQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); void handlePackageIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); void handleClassQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); void handleClassIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleSchemaRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleSchemaRequestLH (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence); void handleSchemaResponseLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); void handleAttachRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); void handleGetQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); void handleMethodRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); - void handleGetQueryLH (const std::string& body, const std::string& replyToKey, const std::string& cid, bool viaLocal); - void handleMethodRequestLH (const std::string& body, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal); - void handleLocateRequestLH (const std::string& body, const std::string &replyToKey, const std::string& cid); + void handleGetQueryLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal); + void handleMethodRequestLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal); + void handleLocateRequestLH (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid); size_t validateSchema(framing::Buffer&, uint8_t kind); -- cgit v1.2.1