diff options
| author | Ted Ross <tross@apache.org> | 2010-09-21 21:48:41 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2010-09-21 21:48:41 +0000 |
| commit | d47927b3e150057f6d615a0d00c8eff6c83320ac (patch) | |
| tree | 6cf1da8bd7a46fd3cef8251af94f88bbad0e627d /qpid/cpp/src/qmf/Agent.cpp | |
| parent | 81414cc0fb52efbd77e3e3bc83ed0c5dcb7fe83a (diff) | |
| download | qpid-python-d47927b3e150057f6d615a0d00c8eff6c83320ac.tar.gz | |
QMFv2 Additions:
- QMFv2 schema encoding completed
- Schema queries handled by the agent and initiated by the console by user request
- Full query support with predicates evaluated on the agent (regex not yet implemented)
- Agent filtering in the console
- Agent aging in the console
- Unit tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@999662 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qmf/Agent.cpp')
| -rw-r--r-- | qpid/cpp/src/qmf/Agent.cpp | 151 |
1 files changed, 88 insertions, 63 deletions
diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp index c7ccea35d5..05bf1a38aa 100644 --- a/qpid/cpp/src/qmf/Agent.cpp +++ b/qpid/cpp/src/qmf/Agent.cpp @@ -25,6 +25,8 @@ #include "qmf/ConsoleSession.h" #include "qmf/DataImpl.h" #include "qmf/Query.h" +#include "qmf/SchemaImpl.h" +#include "qmf/agentCapability.h" #include "qpid/messaging/Sender.h" #include "qpid/messaging/AddressParser.h" #include "qpid/management/Buffer.h" @@ -51,6 +53,8 @@ string Agent::getProduct() const { return isValid() ? impl->getProduct() : ""; } string Agent::getInstance() const { return isValid() ? impl->getInstance() : ""; } const Variant& Agent::getAttribute(const string& k) const { return impl->getAttribute(k); } const Variant::Map& Agent::getAttributes() const { return impl->getAttributes(); } +ConsoleEvent Agent::querySchema(Duration t) { return impl->querySchema(t); } +uint32_t Agent::querySchemaAsync() { return impl->querySchemaAsync(); } ConsoleEvent Agent::query(const Query& q, Duration t) { return impl->query(q, t); } ConsoleEvent Agent::query(const string& q, Duration t) { return impl->query(q, t); } uint32_t Agent::queryAsync(const Query& q) { return impl->queryAsync(q); } @@ -66,11 +70,20 @@ Schema Agent::getSchema(const SchemaId& s, Duration t) { return impl->getSchema( AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) : - name(n), epoch(e), session(s), touched(true), untouchedCount(0), + name(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0), nextCorrelator(1), schemaCache(s.schemaCache) { } +void AgentImpl::setAttribute(const std::string& k, const qpid::types::Variant& v) +{ + attributes[k] = v; + if (k == "qmf.agent_capability") + try { + capability = v.asUint32(); + } catch (std::exception&) {} +} + const Variant& AgentImpl::getAttribute(const string& k) const { Variant::Map::const_iterator iter = attributes.find(k); @@ -79,6 +92,7 @@ const Variant& AgentImpl::getAttribute(const string& k) const return iter->second; } + ConsoleEvent AgentImpl::query(const Query& query, Duration timeout) { boost::shared_ptr<SyncContext> context(new SyncContext()); @@ -258,7 +272,6 @@ SchemaId AgentImpl::getSchemaId(const string& pname, uint32_t idx) const Schema AgentImpl::getSchema(const SchemaId& id, Duration timeout) { - qpid::sys::Mutex::ScopedLock l(lock); if (!schemaCache->haveSchema(id)) // // The desired schema is not in the cache. We need to asynchronously query the remote @@ -375,6 +388,14 @@ void AgentImpl::handleQueryResponse(const Variant::List& list, const Message& ms if (aIter == props.end()) final = true; + aIter = props.find("qmf.content"); + if (aIter == props.end()) + return; + + string content_type(aIter->second.asString()); + if (content_type != "_schema" && content_type != "_schema_id" && content_type != "_data") + return; + try { correlator = boost::lexical_cast<uint32_t>(cid); } catch(const boost::bad_lexical_cast&) { correlator = 0; } @@ -392,12 +413,26 @@ void AgentImpl::handleQueryResponse(const Variant::List& list, const Message& ms qpid::sys::Mutex::ScopedLock cl(context->lock); if (!context->response.isValid()) context->response = ConsoleEvent(new ConsoleEventImpl(CONSOLE_QUERY_RESPONSE)); - for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { - Data data(new DataImpl(lIter->asMap(), this)); - ConsoleEventImplAccess::get(context->response).addData(data); - if (data.hasSchema()) - learnSchemaId(data.getSchemaId()); - } + + if (content_type == "_data") + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + Data data(new DataImpl(lIter->asMap(), this)); + ConsoleEventImplAccess::get(context->response).addData(data); + if (data.hasSchema()) + learnSchemaId(data.getSchemaId()); + } + else if (content_type == "_schema_id") + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + SchemaId schemaId(new SchemaIdImpl(lIter->asMap())); + ConsoleEventImplAccess::get(context->response).addSchemaId(schemaId); + learnSchemaId(schemaId); + } + else if (content_type == "_schema") + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + Schema schema(new SchemaImpl(lIter->asMap())); + schemaCache->declareSchema(schema); + } + if (final) { ConsoleEventImplAccess::get(context->response).setFinal(); ConsoleEventImplAccess::get(context->response).setAgent(this); @@ -410,15 +445,30 @@ void AgentImpl::handleQueryResponse(const Variant::List& list, const Message& ms auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_QUERY_RESPONSE)); eventImpl->setCorrelator(correlator); eventImpl->setAgent(this); - for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { - Data data(new DataImpl(lIter->asMap(), this)); - eventImpl->addData(data); - if (data.hasSchema()) - learnSchemaId(data.getSchemaId()); - } + + if (content_type == "_data") + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + Data data(new DataImpl(lIter->asMap(), this)); + eventImpl->addData(data); + if (data.hasSchema()) + learnSchemaId(data.getSchemaId()); + } + else if (content_type == "_schema_id") + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + SchemaId schemaId(new SchemaIdImpl(lIter->asMap())); + eventImpl->addSchemaId(schemaId); + learnSchemaId(schemaId); + } + else if (content_type == "_schema") + for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) { + Schema schema(new SchemaImpl(lIter->asMap())); + schemaCache->declareSchema(schema); + } + if (final) eventImpl->setFinal(); - session.enqueueEvent(eventImpl.release()); + if (content_type != "_schema") + session.enqueueEvent(eventImpl.release()); } } @@ -441,14 +491,11 @@ Query AgentImpl::stringToQuery(const std::string& text) if (iter != map.end()) packageName = iter->second.asString(); - Query query(className, packageName); + Query query(QUERY_OBJECT, className, packageName); iter = map.find("where"); - if (iter != map.end()) { - const Variant::Map& pred(iter->second.asMap()); - for (iter = pred.begin(); iter != pred.end(); iter++) - query.addPredicate(iter->first, iter->second); - } + if (iter != map.end()) + query.setPredicate(iter->second.asList()); return query; } @@ -464,42 +511,11 @@ void AgentImpl::sendQuery(const Query& query, uint32_t correlator) headers["qmf.opcode"] = "_query_request"; headers["x-amqp-0-10.app-id"] = "qmf2"; - map["_what"] = "OBJECT"; - - const DataAddr& dataAddr(query.getDataAddr()); - const SchemaId& schemaId(query.getSchemaId()); - - if (dataAddr.isValid()) - map["_object_id"] = dataAddr.asMap(); - else { - string className; - string packageName; - if (schemaId.isValid()) { - className = schemaId.getName(); - packageName = schemaId.getPackageName(); - } else { - className = query.getClassName(); - if (className.empty()) - throw QmfException("Invalid Query"); - packageName = query.getPackageName(); - } - Variant::Map idMap; - idMap["_class_name"] = className; - if (!packageName.empty()) - idMap["_package_name"] = packageName; - map["_schema_id"] = idMap; - } - - // - // TODO: Encode a simple-predicate if present. - // - msg.setReplyTo(session.replyAddress); msg.setCorrelationId(boost::lexical_cast<string>(correlator)); - encode(map, msg); - Sender sender(session.session.createSender(session.directBase + "/" + name)); - sender.send(msg); - sender.close(); + msg.setSubject(name); + encode(QueryImplAccess::get(query).asMap(), msg); + session.directSender.send(msg); QPID_LOG(trace, "SENT QueryRequest to=" << name); } @@ -521,17 +537,27 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const msg.setReplyTo(session.replyAddress); msg.setCorrelationId(boost::lexical_cast<string>(correlator)); + msg.setSubject(name); encode(map, msg); - Sender sender(session.session.createSender(session.directBase + "/" + name)); - sender.send(msg); - sender.close(); + session.directSender.send(msg); QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << name); } void AgentImpl::sendSchemaRequest(const SchemaId& id) { - // TODO: Check agent's capability value to determine which kind of schema request to make + uint32_t correlator; + + { + qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; + } + + if (capability >= AGENT_CAPABILITY_V2_SCHEMA) { + Query query(QUERY_SCHEMA, id); + sendQuery(query, correlator); + return; + } #define RAW_BUFFER_SIZE 1024 char rawBuffer[RAW_BUFFER_SIZE]; @@ -541,7 +567,7 @@ void AgentImpl::sendSchemaRequest(const SchemaId& id) buffer.putOctet('M'); buffer.putOctet('2'); buffer.putOctet('S'); - buffer.putLong(nextCorrelator++); + buffer.putLong(correlator); buffer.putShortString(id.getPackageName()); buffer.putShortString(id.getName()); buffer.putBin128(id.getHash().data()); @@ -551,9 +577,8 @@ void AgentImpl::sendSchemaRequest(const SchemaId& id) Message msg; msg.setReplyTo(session.replyAddress); msg.setContent(content); - Sender sender(session.session.createSender(session.directBase + "/" + name)); - sender.send(msg); - sender.close(); + msg.setSubject(name); + session.directSender.send(msg); QPID_LOG(trace, "SENT V1SchemaRequest to=" << name); } |
