diff options
Diffstat (limited to 'cpp/src/qmf/AgentSession.cpp')
| -rw-r--r-- | cpp/src/qmf/AgentSession.cpp | 250 |
1 files changed, 126 insertions, 124 deletions
diff --git a/cpp/src/qmf/AgentSession.cpp b/cpp/src/qmf/AgentSession.cpp index 28c324cc02..eca48d6b83 100644 --- a/cpp/src/qmf/AgentSession.cpp +++ b/cpp/src/qmf/AgentSession.cpp @@ -28,7 +28,8 @@ #include "qmf/SchemaImpl.h" #include "qmf/DataAddrImpl.h" #include "qmf/DataImpl.h" -#include "qmf/Query.h" +#include "qmf/QueryImpl.h" +#include "qmf/agentCapability.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Condition.h" #include "qpid/sys/Thread.h" @@ -86,11 +87,14 @@ namespace qmf { private: typedef map<DataAddr, Data, DataAddrCompare> DataIndex; + typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap; mutable qpid::sys::Mutex lock; qpid::sys::Condition cond; Connection connection; Session session; + Sender directSender; + Sender topicSender; string domain; Variant::Map attributes; Variant::Map options; @@ -103,6 +107,7 @@ namespace qmf { uint32_t interval; uint64_t lastHeartbeat; uint64_t lastVisit; + bool forceHeartbeat; bool externalStorage; bool autoAllowQueries; bool autoAllowMethods; @@ -110,21 +115,20 @@ namespace qmf { string directBase; string topicBase; - set<string> packages; - map<SchemaId, Schema, SchemaIdCompare> schemata; + SchemaMap schemata; DataIndex globalIndex; - map<SchemaId, DataIndex, SchemaIdCompare> schemaIndex; + map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex; void checkOpen(); void setAgentName(); void enqueueEvent(const AgentEvent&); - void handleLocateRequest(const Variant::Map& content, const Message& msg); + void handleLocateRequest(const Variant::List& content, const Message& msg); void handleMethodRequest(const Variant::Map& content, const Message& msg); void handleQueryRequest(const Variant::Map& content, const Message& msg); + void handleSchemaRequest(AgentEvent&); void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&); void dispatch(Message); void sendHeartbeat(); - bool predicateMatch(const Query&, const Data&); void flushResponses(AgentEvent&, bool); void periodicProcessing(uint64_t); void run(); @@ -166,14 +170,14 @@ void AgentSession::raiseEvent(const Data& d) { impl->raiseEvent(d); } AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : connection(c), domain("default"), opened(false), thread(0), threadCanceled(false), - bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), externalStorage(false), - autoAllowQueries(true), autoAllowMethods(true), + bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false), + externalStorage(false), autoAllowQueries(true), autoAllowMethods(true), schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()))) { // // Set Capability Level to 1 // - attributes["_capability_level"] = 1; + attributes["qmf.agent_capability"] = AGENT_CAPABILITY_0_8; if (!options.empty()) { qpid::messaging::AddressParser parser(options); @@ -234,6 +238,9 @@ void AgentSessionImpl::open() directRx.setCapacity(64); topicRx.setCapacity(64); + directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}"); + topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}"); + // Start the receiver thread threadCanceled = false; thread = new qpid::sys::Thread(*this); @@ -280,18 +287,19 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout) void AgentSessionImpl::registerSchema(Schema& schema) { - qpid::sys::Mutex::ScopedLock l(lock); - schemaUpdateTime = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); - if (!schema.isFinalized()) schema.finalize(); - const SchemaId& schemaId(schema.getSchemaId()); - const string& packageName(schemaId.getPackageName()); - packages.insert(packageName); + qpid::sys::Mutex::ScopedLock l(lock); schemata[schemaId] = schema; schemaIndex[schemaId] = DataIndex(); + + // + // Get the news out at the next periodic interval that there is new schema information. + // + schemaUpdateTime = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + forceHeartbeat = true; } @@ -380,34 +388,14 @@ void AgentSessionImpl::authAccept(AgentEvent& authEvent) map<SchemaId, DataIndex>::const_iterator iter = schemaIndex.find(query.getSchemaId()); if (iter != schemaIndex.end()) for (DataIndex::const_iterator dIter = iter->second.begin(); dIter != iter->second.end(); dIter++) - if (predicateMatch(query, dIter->second)) + if (query.matchesPredicate(dIter->second.getProperties())) response(event, dIter->second); } complete(event); return; } - const string& className(query.getClassName()); - const string& packageName(query.getPackageName()); - - if (className.empty()) { - raiseException(event, "Query is Invalid"); - return; - } - - { - qpid::sys::Mutex::ScopedLock l(lock); - map<SchemaId, DataIndex>::const_iterator sIter; - for (sIter = schemaIndex.begin(); sIter != schemaIndex.end(); sIter++) { - const SchemaId& schemaId(sIter->first); - if (schemaId.getName() == className && - (packageName.empty() || schemaId.getPackageName() == packageName)) - for (DataIndex::const_iterator dIter = sIter->second.begin(); dIter != sIter->second.end(); dIter++) - if (predicateMatch(query, dIter->second)) - response(event, dIter->second); - } - } - complete(event); + raiseException(event, "Query is Invalid"); } @@ -501,10 +489,6 @@ void AgentSessionImpl::raiseEvent(const Data& data) Variant::Map map; Variant::Map& headers(msg.getProperties()); - std::stringstream address; - - address << topicBase << "/agent.ind.event"; - // TODO: add severity.package.class to key // or modify to send only to subscriptions with matching queries @@ -513,13 +497,12 @@ void AgentSessionImpl::raiseEvent(const Data& data) headers["qmf.content"] = "_event"; headers["qmf.agent"] = agentName; headers["x-amqp-0-10.app-id"] = "qmf2"; + msg.setSubject("agent.ind.event"); encode(DataImplAccess::get(data).asMap(), msg); - Sender sender(session.createSender(address.str())); - sender.send(msg); - sender.close(); + topicSender.send(msg); - QPID_LOG(trace, "SENT EventIndication to=" << address.str()); + QPID_LOG(trace, "SENT EventIndication to=agent.ind.event"); } @@ -571,10 +554,19 @@ void AgentSessionImpl::setAgentName() } -void AgentSessionImpl::handleLocateRequest(const Variant::Map&, const Message& msg) +void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const Message& msg) { QPID_LOG(trace, "RCVD AgentLocateRequest"); + if (!predicate.empty()) { + Query agentQuery(QUERY_OBJECT); + agentQuery.setPredicate(predicate); + if (!agentQuery.matchesPredicate(attributes)) { + QPID_LOG(trace, "AgentLocate predicate does not match this agent, ignoring"); + return; + } + } + Message reply; Variant::Map map; Variant::Map& headers(reply.getProperties()); @@ -643,64 +635,72 @@ void AgentSessionImpl::handleQueryRequest(const Variant::Map& content, const Mes // // Construct an AgentEvent to be sent to the application or directly handled by the agent. // + auto_ptr<QueryImpl> queryImpl(new QueryImpl(content)); auto_ptr<AgentEventImpl> eventImpl(new AgentEventImpl(AGENT_AUTH_QUERY)); eventImpl->setUserId(msg.getUserId()); eventImpl->setReplyTo(msg.getReplyTo()); eventImpl->setCorrelationId(msg.getCorrelationId()); + eventImpl->setQuery(queryImpl.release()); + AgentEvent ae(eventImpl.release()); - Query query; - Variant::Map::const_iterator iter; - - iter = content.find("_what"); - if (iter == content.end()) { - QPID_LOG(error, "Received QueryRequest with no _what element"); + if (ae.getQuery().getTarget() == QUERY_SCHEMA_ID || ae.getQuery().getTarget() == QUERY_SCHEMA) { + handleSchemaRequest(ae); return; } - if (iter->second.asString() == "OBJECT") { - // - // This is an object query, handle the various flavors of query. - // - iter = content.find("_object_id"); - if (iter != content.end()) { - auto_ptr<DataAddrImpl> addrImpl(new DataAddrImpl(iter->second.asMap())); - query = Query(DataAddr(addrImpl.release())); - } else { - iter = content.find("_schema_id"); - if (iter != content.end()) { - const Variant::Map& map(iter->second.asMap()); - string className; - string packageName; - - iter = map.find("_class_name"); - if (iter == map.end()) { - QPID_LOG(error, "Received QueryRequest with no invalid schemaId"); - return; - } + if (autoAllowQueries) + authAccept(ae); + else + enqueueEvent(ae); +} - className = iter->second.asString(); - iter = map.find("_package_name"); - if (iter != map.end()) - packageName = iter->second.asString(); - query = Query(className, packageName); - } else { - QPID_LOG(error, "Received QueryRequest with no valid elements"); - return; - } - } +void AgentSessionImpl::handleSchemaRequest(AgentEvent& event) +{ + SchemaMap::const_iterator iter; + string error; + const Query& query(event.getQuery()); + + Message msg; + Variant::List content; + Variant::Map map; + Variant::Map& headers(msg.getProperties()); - eventImpl->setQuery(query); + headers["method"] = "response"; + headers["qmf.opcode"] = "_query_response"; + headers["qmf.agent"] = agentName; + headers["x-amqp-0-10.app-id"] = "qmf2"; - if (autoAllowQueries) { - AgentEvent ae(eventImpl.release()); - authAccept(ae); - } else - enqueueEvent(AgentEvent(eventImpl.release())); + { + qpid::sys::Mutex::ScopedLock l(lock); + if (query.getTarget() == QUERY_SCHEMA_ID) { + headers["qmf.content"] = "_schema_id"; + for (iter = schemata.begin(); iter != schemata.end(); iter++) + content.push_back(SchemaIdImplAccess::get(iter->first).asMap()); + } else if (query.getSchemaId().isValid()) { + headers["qmf.content"] = "_schema"; + iter = schemata.find(query.getSchemaId()); + if (iter != schemata.end()) + content.push_back(SchemaImplAccess::get(iter->second).asMap()); + } else { + error = "Invalid Schema Query: Requests for SCHEMA must supply a valid schema ID."; + } + } - } else if (iter->second.asString() == "SCHEMA") { - // TODO: process a v2 schema request + if (!error.empty()) { + raiseException(event, error); + return; } + + AgentEventImpl& eventImpl(AgentEventImplAccess::get(event)); + + msg.setCorrelationId(eventImpl.getCorrelationId()); + encode(content, msg); + Sender sender(session.createSender(eventImpl.getReplyTo())); + sender.send(msg); + sender.close(); + + QPID_LOG(trace, "SENT QueryResponse(Schema) to=" << eventImpl.getReplyTo()); } @@ -723,17 +723,20 @@ void AgentSessionImpl::handleV1SchemaRequest(qpid::management::Buffer& buffer, u SchemaId dataId(SCHEMA_TYPE_DATA, packageName, className); dataId.setHash(hash); - iter = schemata.find(dataId); - if (iter != schemata.end()) - replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq); - else { - SchemaId eventId(SCHEMA_TYPE_EVENT, packageName, className); - eventId.setHash(hash); + { + qpid::sys::Mutex::ScopedLock l(lock); iter = schemata.find(dataId); if (iter != schemata.end()) replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq); - else - return; + else { + SchemaId eventId(SCHEMA_TYPE_EVENT, packageName, className); + eventId.setHash(hash); + iter = schemata.find(dataId); + if (iter != schemata.end()) + replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq); + else + return; + } } Message reply; @@ -765,23 +768,30 @@ void AgentSessionImpl::dispatch(Message msg) return; } - if (msg.getContentType() != "amqp/map") { - QPID_LOG(trace, "Message received with content type '" << msg.getContentType() << - "'. Expected 'amqp/map'"); - return; - } + const string& opcode = iter->second.asString(); - Variant::Map content; - decode(msg, content); + if (msg.getContentType() == "amqp/list") { + Variant::List content; + decode(msg, content); - const string& opcode = iter->second.asString(); + if (opcode == "_agent_locate_request") handleLocateRequest(content, msg); + else { + QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/list' content: " << opcode); + } - if (opcode == "_agent_locate_request") handleLocateRequest(content, msg); - else if (opcode == "_method_request") handleMethodRequest(content, msg); - else if (opcode == "_query_request") handleQueryRequest(content, msg); - else { - QPID_LOG(trace, "Unknown QMFv2 opcode: " << opcode); + } else if (msg.getContentType() == "amqp/map") { + Variant::Map content; + decode(msg, content); + + if (opcode == "_method_request") handleMethodRequest(content, msg); + else if (opcode == "_query_request") handleQueryRequest(content, msg); + else { + QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/map' content: " << opcode); + } + } else { + QPID_LOG(trace, "Unexpected QMFv2 content type. Expected amqp/list or amqp/map"); } + } else { // // Dispatch a QMFv1 formatted message @@ -812,7 +822,7 @@ void AgentSessionImpl::sendHeartbeat() Variant::Map& headers(msg.getProperties()); std::stringstream address; - address << topicBase << "/agent.ind.heartbeat"; + address << "agent.ind.heartbeat"; // append .<vendor>.<product> to address key if present. Variant::Map::const_iterator v; @@ -827,6 +837,7 @@ void AgentSessionImpl::sendHeartbeat() headers["qmf.opcode"] = "_agent_heartbeat_indication"; headers["qmf.agent"] = agentName; headers["x-amqp-0-10.app-id"] = "qmf2"; + msg.setSubject(address.str()); map["_values"] = attributes; map["_values"].asMap()["timestamp"] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); @@ -835,17 +846,8 @@ void AgentSessionImpl::sendHeartbeat() map["_values"].asMap()["schemaUpdated"] = schemaUpdateTime; encode(map, msg); - Sender sender = session.createSender(address.str()); - sender.send(msg); + topicSender.send(msg); QPID_LOG(trace, "SENT AgentHeartbeat name=" << agentName); - sender.close(); -} - - -bool AgentSessionImpl::predicateMatch(const Query&, const Data&) -{ - // TODO: Implement a proper predicate match - return true; } @@ -901,8 +903,9 @@ void AgentSessionImpl::periodicProcessing(uint64_t seconds) // // If the hearbeat interval has elapsed, send a heartbeat. // - if (seconds - lastHeartbeat >= interval) { + if (forceHeartbeat || (seconds - lastHeartbeat >= interval)) { lastHeartbeat = seconds; + forceHeartbeat = false; sendHeartbeat(); } @@ -941,4 +944,3 @@ void AgentSessionImpl::run() QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName); } - |
