summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf/Agent.cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-09-21 21:48:41 +0000
committerTed Ross <tross@apache.org>2010-09-21 21:48:41 +0000
commitd47927b3e150057f6d615a0d00c8eff6c83320ac (patch)
tree6cf1da8bd7a46fd3cef8251af94f88bbad0e627d /qpid/cpp/src/qmf/Agent.cpp
parent81414cc0fb52efbd77e3e3bc83ed0c5dcb7fe83a (diff)
downloadqpid-python-d47927b3e150057f6d615a0d00c8eff6c83320ac.tar.gz
QMFv2 Additions:
- QMFv2 schema encoding completed - Schema queries handled by the agent and initiated by the console by user request - Full query support with predicates evaluated on the agent (regex not yet implemented) - Agent filtering in the console - Agent aging in the console - Unit tests git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@999662 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qmf/Agent.cpp')
-rw-r--r--qpid/cpp/src/qmf/Agent.cpp151
1 files changed, 88 insertions, 63 deletions
diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp
index c7ccea35d5..05bf1a38aa 100644
--- a/qpid/cpp/src/qmf/Agent.cpp
+++ b/qpid/cpp/src/qmf/Agent.cpp
@@ -25,6 +25,8 @@
#include "qmf/ConsoleSession.h"
#include "qmf/DataImpl.h"
#include "qmf/Query.h"
+#include "qmf/SchemaImpl.h"
+#include "qmf/agentCapability.h"
#include "qpid/messaging/Sender.h"
#include "qpid/messaging/AddressParser.h"
#include "qpid/management/Buffer.h"
@@ -51,6 +53,8 @@ string Agent::getProduct() const { return isValid() ? impl->getProduct() : ""; }
string Agent::getInstance() const { return isValid() ? impl->getInstance() : ""; }
const Variant& Agent::getAttribute(const string& k) const { return impl->getAttribute(k); }
const Variant::Map& Agent::getAttributes() const { return impl->getAttributes(); }
+ConsoleEvent Agent::querySchema(Duration t) { return impl->querySchema(t); }
+uint32_t Agent::querySchemaAsync() { return impl->querySchemaAsync(); }
ConsoleEvent Agent::query(const Query& q, Duration t) { return impl->query(q, t); }
ConsoleEvent Agent::query(const string& q, Duration t) { return impl->query(q, t); }
uint32_t Agent::queryAsync(const Query& q) { return impl->queryAsync(q); }
@@ -66,11 +70,20 @@ Schema Agent::getSchema(const SchemaId& s, Duration t) { return impl->getSchema(
AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) :
- name(n), epoch(e), session(s), touched(true), untouchedCount(0),
+ name(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0),
nextCorrelator(1), schemaCache(s.schemaCache)
{
}
+void AgentImpl::setAttribute(const std::string& k, const qpid::types::Variant& v)
+{
+ attributes[k] = v;
+ if (k == "qmf.agent_capability")
+ try {
+ capability = v.asUint32();
+ } catch (std::exception&) {}
+}
+
const Variant& AgentImpl::getAttribute(const string& k) const
{
Variant::Map::const_iterator iter = attributes.find(k);
@@ -79,6 +92,7 @@ const Variant& AgentImpl::getAttribute(const string& k) const
return iter->second;
}
+
ConsoleEvent AgentImpl::query(const Query& query, Duration timeout)
{
boost::shared_ptr<SyncContext> context(new SyncContext());
@@ -258,7 +272,6 @@ SchemaId AgentImpl::getSchemaId(const string& pname, uint32_t idx) const
Schema AgentImpl::getSchema(const SchemaId& id, Duration timeout)
{
- qpid::sys::Mutex::ScopedLock l(lock);
if (!schemaCache->haveSchema(id))
//
// The desired schema is not in the cache. We need to asynchronously query the remote
@@ -375,6 +388,14 @@ void AgentImpl::handleQueryResponse(const Variant::List& list, const Message& ms
if (aIter == props.end())
final = true;
+ aIter = props.find("qmf.content");
+ if (aIter == props.end())
+ return;
+
+ string content_type(aIter->second.asString());
+ if (content_type != "_schema" && content_type != "_schema_id" && content_type != "_data")
+ return;
+
try { correlator = boost::lexical_cast<uint32_t>(cid); }
catch(const boost::bad_lexical_cast&) { correlator = 0; }
@@ -392,12 +413,26 @@ void AgentImpl::handleQueryResponse(const Variant::List& list, const Message& ms
qpid::sys::Mutex::ScopedLock cl(context->lock);
if (!context->response.isValid())
context->response = ConsoleEvent(new ConsoleEventImpl(CONSOLE_QUERY_RESPONSE));
- for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
- Data data(new DataImpl(lIter->asMap(), this));
- ConsoleEventImplAccess::get(context->response).addData(data);
- if (data.hasSchema())
- learnSchemaId(data.getSchemaId());
- }
+
+ if (content_type == "_data")
+ for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
+ Data data(new DataImpl(lIter->asMap(), this));
+ ConsoleEventImplAccess::get(context->response).addData(data);
+ if (data.hasSchema())
+ learnSchemaId(data.getSchemaId());
+ }
+ else if (content_type == "_schema_id")
+ for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
+ SchemaId schemaId(new SchemaIdImpl(lIter->asMap()));
+ ConsoleEventImplAccess::get(context->response).addSchemaId(schemaId);
+ learnSchemaId(schemaId);
+ }
+ else if (content_type == "_schema")
+ for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
+ Schema schema(new SchemaImpl(lIter->asMap()));
+ schemaCache->declareSchema(schema);
+ }
+
if (final) {
ConsoleEventImplAccess::get(context->response).setFinal();
ConsoleEventImplAccess::get(context->response).setAgent(this);
@@ -410,15 +445,30 @@ void AgentImpl::handleQueryResponse(const Variant::List& list, const Message& ms
auto_ptr<ConsoleEventImpl> eventImpl(new ConsoleEventImpl(CONSOLE_QUERY_RESPONSE));
eventImpl->setCorrelator(correlator);
eventImpl->setAgent(this);
- for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
- Data data(new DataImpl(lIter->asMap(), this));
- eventImpl->addData(data);
- if (data.hasSchema())
- learnSchemaId(data.getSchemaId());
- }
+
+ if (content_type == "_data")
+ for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
+ Data data(new DataImpl(lIter->asMap(), this));
+ eventImpl->addData(data);
+ if (data.hasSchema())
+ learnSchemaId(data.getSchemaId());
+ }
+ else if (content_type == "_schema_id")
+ for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
+ SchemaId schemaId(new SchemaIdImpl(lIter->asMap()));
+ eventImpl->addSchemaId(schemaId);
+ learnSchemaId(schemaId);
+ }
+ else if (content_type == "_schema")
+ for (Variant::List::const_iterator lIter = list.begin(); lIter != list.end(); lIter++) {
+ Schema schema(new SchemaImpl(lIter->asMap()));
+ schemaCache->declareSchema(schema);
+ }
+
if (final)
eventImpl->setFinal();
- session.enqueueEvent(eventImpl.release());
+ if (content_type != "_schema")
+ session.enqueueEvent(eventImpl.release());
}
}
@@ -441,14 +491,11 @@ Query AgentImpl::stringToQuery(const std::string& text)
if (iter != map.end())
packageName = iter->second.asString();
- Query query(className, packageName);
+ Query query(QUERY_OBJECT, className, packageName);
iter = map.find("where");
- if (iter != map.end()) {
- const Variant::Map& pred(iter->second.asMap());
- for (iter = pred.begin(); iter != pred.end(); iter++)
- query.addPredicate(iter->first, iter->second);
- }
+ if (iter != map.end())
+ query.setPredicate(iter->second.asList());
return query;
}
@@ -464,42 +511,11 @@ void AgentImpl::sendQuery(const Query& query, uint32_t correlator)
headers["qmf.opcode"] = "_query_request";
headers["x-amqp-0-10.app-id"] = "qmf2";
- map["_what"] = "OBJECT";
-
- const DataAddr& dataAddr(query.getDataAddr());
- const SchemaId& schemaId(query.getSchemaId());
-
- if (dataAddr.isValid())
- map["_object_id"] = dataAddr.asMap();
- else {
- string className;
- string packageName;
- if (schemaId.isValid()) {
- className = schemaId.getName();
- packageName = schemaId.getPackageName();
- } else {
- className = query.getClassName();
- if (className.empty())
- throw QmfException("Invalid Query");
- packageName = query.getPackageName();
- }
- Variant::Map idMap;
- idMap["_class_name"] = className;
- if (!packageName.empty())
- idMap["_package_name"] = packageName;
- map["_schema_id"] = idMap;
- }
-
- //
- // TODO: Encode a simple-predicate if present.
- //
-
msg.setReplyTo(session.replyAddress);
msg.setCorrelationId(boost::lexical_cast<string>(correlator));
- encode(map, msg);
- Sender sender(session.session.createSender(session.directBase + "/" + name));
- sender.send(msg);
- sender.close();
+ msg.setSubject(name);
+ encode(QueryImplAccess::get(query).asMap(), msg);
+ session.directSender.send(msg);
QPID_LOG(trace, "SENT QueryRequest to=" << name);
}
@@ -521,17 +537,27 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const
msg.setReplyTo(session.replyAddress);
msg.setCorrelationId(boost::lexical_cast<string>(correlator));
+ msg.setSubject(name);
encode(map, msg);
- Sender sender(session.session.createSender(session.directBase + "/" + name));
- sender.send(msg);
- sender.close();
+ session.directSender.send(msg);
QPID_LOG(trace, "SENT MethodRequest method=" << method << " to=" << name);
}
void AgentImpl::sendSchemaRequest(const SchemaId& id)
{
- // TODO: Check agent's capability value to determine which kind of schema request to make
+ uint32_t correlator;
+
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ correlator = nextCorrelator++;
+ }
+
+ if (capability >= AGENT_CAPABILITY_V2_SCHEMA) {
+ Query query(QUERY_SCHEMA, id);
+ sendQuery(query, correlator);
+ return;
+ }
#define RAW_BUFFER_SIZE 1024
char rawBuffer[RAW_BUFFER_SIZE];
@@ -541,7 +567,7 @@ void AgentImpl::sendSchemaRequest(const SchemaId& id)
buffer.putOctet('M');
buffer.putOctet('2');
buffer.putOctet('S');
- buffer.putLong(nextCorrelator++);
+ buffer.putLong(correlator);
buffer.putShortString(id.getPackageName());
buffer.putShortString(id.getName());
buffer.putBin128(id.getHash().data());
@@ -551,9 +577,8 @@ void AgentImpl::sendSchemaRequest(const SchemaId& id)
Message msg;
msg.setReplyTo(session.replyAddress);
msg.setContent(content);
- Sender sender(session.session.createSender(session.directBase + "/" + name));
- sender.send(msg);
- sender.close();
+ msg.setSubject(name);
+ session.directSender.send(msg);
QPID_LOG(trace, "SENT V1SchemaRequest to=" << name);
}