summaryrefslogtreecommitdiff
path: root/cpp/src/qmf/AgentSession.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qmf/AgentSession.cpp')
-rw-r--r--cpp/src/qmf/AgentSession.cpp250
1 files changed, 126 insertions, 124 deletions
diff --git a/cpp/src/qmf/AgentSession.cpp b/cpp/src/qmf/AgentSession.cpp
index 28c324cc02..eca48d6b83 100644
--- a/cpp/src/qmf/AgentSession.cpp
+++ b/cpp/src/qmf/AgentSession.cpp
@@ -28,7 +28,8 @@
#include "qmf/SchemaImpl.h"
#include "qmf/DataAddrImpl.h"
#include "qmf/DataImpl.h"
-#include "qmf/Query.h"
+#include "qmf/QueryImpl.h"
+#include "qmf/agentCapability.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/Condition.h"
#include "qpid/sys/Thread.h"
@@ -86,11 +87,14 @@ namespace qmf {
private:
typedef map<DataAddr, Data, DataAddrCompare> DataIndex;
+ typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap;
mutable qpid::sys::Mutex lock;
qpid::sys::Condition cond;
Connection connection;
Session session;
+ Sender directSender;
+ Sender topicSender;
string domain;
Variant::Map attributes;
Variant::Map options;
@@ -103,6 +107,7 @@ namespace qmf {
uint32_t interval;
uint64_t lastHeartbeat;
uint64_t lastVisit;
+ bool forceHeartbeat;
bool externalStorage;
bool autoAllowQueries;
bool autoAllowMethods;
@@ -110,21 +115,20 @@ namespace qmf {
string directBase;
string topicBase;
- set<string> packages;
- map<SchemaId, Schema, SchemaIdCompare> schemata;
+ SchemaMap schemata;
DataIndex globalIndex;
- map<SchemaId, DataIndex, SchemaIdCompare> schemaIndex;
+ map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex;
void checkOpen();
void setAgentName();
void enqueueEvent(const AgentEvent&);
- void handleLocateRequest(const Variant::Map& content, const Message& msg);
+ void handleLocateRequest(const Variant::List& content, const Message& msg);
void handleMethodRequest(const Variant::Map& content, const Message& msg);
void handleQueryRequest(const Variant::Map& content, const Message& msg);
+ void handleSchemaRequest(AgentEvent&);
void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&);
void dispatch(Message);
void sendHeartbeat();
- bool predicateMatch(const Query&, const Data&);
void flushResponses(AgentEvent&, bool);
void periodicProcessing(uint64_t);
void run();
@@ -166,14 +170,14 @@ void AgentSession::raiseEvent(const Data& d) { impl->raiseEvent(d); }
AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
connection(c), domain("default"), opened(false), thread(0), threadCanceled(false),
- bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), externalStorage(false),
- autoAllowQueries(true), autoAllowMethods(true),
+ bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
+ externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())))
{
//
// Set Capability Level to 1
//
- attributes["_capability_level"] = 1;
+ attributes["qmf.agent_capability"] = AGENT_CAPABILITY_0_8;
if (!options.empty()) {
qpid::messaging::AddressParser parser(options);
@@ -234,6 +238,9 @@ void AgentSessionImpl::open()
directRx.setCapacity(64);
topicRx.setCapacity(64);
+ directSender = session.createSender(directBase + ";{create:never,node:{type:topic}}");
+ topicSender = session.createSender(topicBase + ";{create:never,node:{type:topic}}");
+
// Start the receiver thread
threadCanceled = false;
thread = new qpid::sys::Thread(*this);
@@ -280,18 +287,19 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout)
void AgentSessionImpl::registerSchema(Schema& schema)
{
- qpid::sys::Mutex::ScopedLock l(lock);
- schemaUpdateTime = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
-
if (!schema.isFinalized())
schema.finalize();
-
const SchemaId& schemaId(schema.getSchemaId());
- const string& packageName(schemaId.getPackageName());
- packages.insert(packageName);
+ qpid::sys::Mutex::ScopedLock l(lock);
schemata[schemaId] = schema;
schemaIndex[schemaId] = DataIndex();
+
+ //
+ // Get the news out at the next periodic interval that there is new schema information.
+ //
+ schemaUpdateTime = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
+ forceHeartbeat = true;
}
@@ -380,34 +388,14 @@ void AgentSessionImpl::authAccept(AgentEvent& authEvent)
map<SchemaId, DataIndex>::const_iterator iter = schemaIndex.find(query.getSchemaId());
if (iter != schemaIndex.end())
for (DataIndex::const_iterator dIter = iter->second.begin(); dIter != iter->second.end(); dIter++)
- if (predicateMatch(query, dIter->second))
+ if (query.matchesPredicate(dIter->second.getProperties()))
response(event, dIter->second);
}
complete(event);
return;
}
- const string& className(query.getClassName());
- const string& packageName(query.getPackageName());
-
- if (className.empty()) {
- raiseException(event, "Query is Invalid");
- return;
- }
-
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- map<SchemaId, DataIndex>::const_iterator sIter;
- for (sIter = schemaIndex.begin(); sIter != schemaIndex.end(); sIter++) {
- const SchemaId& schemaId(sIter->first);
- if (schemaId.getName() == className &&
- (packageName.empty() || schemaId.getPackageName() == packageName))
- for (DataIndex::const_iterator dIter = sIter->second.begin(); dIter != sIter->second.end(); dIter++)
- if (predicateMatch(query, dIter->second))
- response(event, dIter->second);
- }
- }
- complete(event);
+ raiseException(event, "Query is Invalid");
}
@@ -501,10 +489,6 @@ void AgentSessionImpl::raiseEvent(const Data& data)
Variant::Map map;
Variant::Map& headers(msg.getProperties());
- std::stringstream address;
-
- address << topicBase << "/agent.ind.event";
-
// TODO: add severity.package.class to key
// or modify to send only to subscriptions with matching queries
@@ -513,13 +497,12 @@ void AgentSessionImpl::raiseEvent(const Data& data)
headers["qmf.content"] = "_event";
headers["qmf.agent"] = agentName;
headers["x-amqp-0-10.app-id"] = "qmf2";
+ msg.setSubject("agent.ind.event");
encode(DataImplAccess::get(data).asMap(), msg);
- Sender sender(session.createSender(address.str()));
- sender.send(msg);
- sender.close();
+ topicSender.send(msg);
- QPID_LOG(trace, "SENT EventIndication to=" << address.str());
+ QPID_LOG(trace, "SENT EventIndication to=agent.ind.event");
}
@@ -571,10 +554,19 @@ void AgentSessionImpl::setAgentName()
}
-void AgentSessionImpl::handleLocateRequest(const Variant::Map&, const Message& msg)
+void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const Message& msg)
{
QPID_LOG(trace, "RCVD AgentLocateRequest");
+ if (!predicate.empty()) {
+ Query agentQuery(QUERY_OBJECT);
+ agentQuery.setPredicate(predicate);
+ if (!agentQuery.matchesPredicate(attributes)) {
+ QPID_LOG(trace, "AgentLocate predicate does not match this agent, ignoring");
+ return;
+ }
+ }
+
Message reply;
Variant::Map map;
Variant::Map& headers(reply.getProperties());
@@ -643,64 +635,72 @@ void AgentSessionImpl::handleQueryRequest(const Variant::Map& content, const Mes
//
// Construct an AgentEvent to be sent to the application or directly handled by the agent.
//
+ auto_ptr<QueryImpl> queryImpl(new QueryImpl(content));
auto_ptr<AgentEventImpl> eventImpl(new AgentEventImpl(AGENT_AUTH_QUERY));
eventImpl->setUserId(msg.getUserId());
eventImpl->setReplyTo(msg.getReplyTo());
eventImpl->setCorrelationId(msg.getCorrelationId());
+ eventImpl->setQuery(queryImpl.release());
+ AgentEvent ae(eventImpl.release());
- Query query;
- Variant::Map::const_iterator iter;
-
- iter = content.find("_what");
- if (iter == content.end()) {
- QPID_LOG(error, "Received QueryRequest with no _what element");
+ if (ae.getQuery().getTarget() == QUERY_SCHEMA_ID || ae.getQuery().getTarget() == QUERY_SCHEMA) {
+ handleSchemaRequest(ae);
return;
}
- if (iter->second.asString() == "OBJECT") {
- //
- // This is an object query, handle the various flavors of query.
- //
- iter = content.find("_object_id");
- if (iter != content.end()) {
- auto_ptr<DataAddrImpl> addrImpl(new DataAddrImpl(iter->second.asMap()));
- query = Query(DataAddr(addrImpl.release()));
- } else {
- iter = content.find("_schema_id");
- if (iter != content.end()) {
- const Variant::Map& map(iter->second.asMap());
- string className;
- string packageName;
-
- iter = map.find("_class_name");
- if (iter == map.end()) {
- QPID_LOG(error, "Received QueryRequest with no invalid schemaId");
- return;
- }
+ if (autoAllowQueries)
+ authAccept(ae);
+ else
+ enqueueEvent(ae);
+}
- className = iter->second.asString();
- iter = map.find("_package_name");
- if (iter != map.end())
- packageName = iter->second.asString();
- query = Query(className, packageName);
- } else {
- QPID_LOG(error, "Received QueryRequest with no valid elements");
- return;
- }
- }
+void AgentSessionImpl::handleSchemaRequest(AgentEvent& event)
+{
+ SchemaMap::const_iterator iter;
+ string error;
+ const Query& query(event.getQuery());
+
+ Message msg;
+ Variant::List content;
+ Variant::Map map;
+ Variant::Map& headers(msg.getProperties());
- eventImpl->setQuery(query);
+ headers["method"] = "response";
+ headers["qmf.opcode"] = "_query_response";
+ headers["qmf.agent"] = agentName;
+ headers["x-amqp-0-10.app-id"] = "qmf2";
- if (autoAllowQueries) {
- AgentEvent ae(eventImpl.release());
- authAccept(ae);
- } else
- enqueueEvent(AgentEvent(eventImpl.release()));
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ if (query.getTarget() == QUERY_SCHEMA_ID) {
+ headers["qmf.content"] = "_schema_id";
+ for (iter = schemata.begin(); iter != schemata.end(); iter++)
+ content.push_back(SchemaIdImplAccess::get(iter->first).asMap());
+ } else if (query.getSchemaId().isValid()) {
+ headers["qmf.content"] = "_schema";
+ iter = schemata.find(query.getSchemaId());
+ if (iter != schemata.end())
+ content.push_back(SchemaImplAccess::get(iter->second).asMap());
+ } else {
+ error = "Invalid Schema Query: Requests for SCHEMA must supply a valid schema ID.";
+ }
+ }
- } else if (iter->second.asString() == "SCHEMA") {
- // TODO: process a v2 schema request
+ if (!error.empty()) {
+ raiseException(event, error);
+ return;
}
+
+ AgentEventImpl& eventImpl(AgentEventImplAccess::get(event));
+
+ msg.setCorrelationId(eventImpl.getCorrelationId());
+ encode(content, msg);
+ Sender sender(session.createSender(eventImpl.getReplyTo()));
+ sender.send(msg);
+ sender.close();
+
+ QPID_LOG(trace, "SENT QueryResponse(Schema) to=" << eventImpl.getReplyTo());
}
@@ -723,17 +723,20 @@ void AgentSessionImpl::handleV1SchemaRequest(qpid::management::Buffer& buffer, u
SchemaId dataId(SCHEMA_TYPE_DATA, packageName, className);
dataId.setHash(hash);
- iter = schemata.find(dataId);
- if (iter != schemata.end())
- replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq);
- else {
- SchemaId eventId(SCHEMA_TYPE_EVENT, packageName, className);
- eventId.setHash(hash);
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
iter = schemata.find(dataId);
if (iter != schemata.end())
replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq);
- else
- return;
+ else {
+ SchemaId eventId(SCHEMA_TYPE_EVENT, packageName, className);
+ eventId.setHash(hash);
+ iter = schemata.find(dataId);
+ if (iter != schemata.end())
+ replyContent = SchemaImplAccess::get(iter->second).asV1Content(seq);
+ else
+ return;
+ }
}
Message reply;
@@ -765,23 +768,30 @@ void AgentSessionImpl::dispatch(Message msg)
return;
}
- if (msg.getContentType() != "amqp/map") {
- QPID_LOG(trace, "Message received with content type '" << msg.getContentType() <<
- "'. Expected 'amqp/map'");
- return;
- }
+ const string& opcode = iter->second.asString();
- Variant::Map content;
- decode(msg, content);
+ if (msg.getContentType() == "amqp/list") {
+ Variant::List content;
+ decode(msg, content);
- const string& opcode = iter->second.asString();
+ if (opcode == "_agent_locate_request") handleLocateRequest(content, msg);
+ else {
+ QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/list' content: " << opcode);
+ }
- if (opcode == "_agent_locate_request") handleLocateRequest(content, msg);
- else if (opcode == "_method_request") handleMethodRequest(content, msg);
- else if (opcode == "_query_request") handleQueryRequest(content, msg);
- else {
- QPID_LOG(trace, "Unknown QMFv2 opcode: " << opcode);
+ } else if (msg.getContentType() == "amqp/map") {
+ Variant::Map content;
+ decode(msg, content);
+
+ if (opcode == "_method_request") handleMethodRequest(content, msg);
+ else if (opcode == "_query_request") handleQueryRequest(content, msg);
+ else {
+ QPID_LOG(trace, "Unexpected QMFv2 opcode with 'amqp/map' content: " << opcode);
+ }
+ } else {
+ QPID_LOG(trace, "Unexpected QMFv2 content type. Expected amqp/list or amqp/map");
}
+
} else {
//
// Dispatch a QMFv1 formatted message
@@ -812,7 +822,7 @@ void AgentSessionImpl::sendHeartbeat()
Variant::Map& headers(msg.getProperties());
std::stringstream address;
- address << topicBase << "/agent.ind.heartbeat";
+ address << "agent.ind.heartbeat";
// append .<vendor>.<product> to address key if present.
Variant::Map::const_iterator v;
@@ -827,6 +837,7 @@ void AgentSessionImpl::sendHeartbeat()
headers["qmf.opcode"] = "_agent_heartbeat_indication";
headers["qmf.agent"] = agentName;
headers["x-amqp-0-10.app-id"] = "qmf2";
+ msg.setSubject(address.str());
map["_values"] = attributes;
map["_values"].asMap()["timestamp"] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()));
@@ -835,17 +846,8 @@ void AgentSessionImpl::sendHeartbeat()
map["_values"].asMap()["schemaUpdated"] = schemaUpdateTime;
encode(map, msg);
- Sender sender = session.createSender(address.str());
- sender.send(msg);
+ topicSender.send(msg);
QPID_LOG(trace, "SENT AgentHeartbeat name=" << agentName);
- sender.close();
-}
-
-
-bool AgentSessionImpl::predicateMatch(const Query&, const Data&)
-{
- // TODO: Implement a proper predicate match
- return true;
}
@@ -901,8 +903,9 @@ void AgentSessionImpl::periodicProcessing(uint64_t seconds)
//
// If the hearbeat interval has elapsed, send a heartbeat.
//
- if (seconds - lastHeartbeat >= interval) {
+ if (forceHeartbeat || (seconds - lastHeartbeat >= interval)) {
lastHeartbeat = seconds;
+ forceHeartbeat = false;
sendHeartbeat();
}
@@ -941,4 +944,3 @@ void AgentSessionImpl::run()
QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName);
}
-