summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.cpp233
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.h8
2 files changed, 138 insertions, 103 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 351e0bfd00..d431e4ca16 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -88,6 +88,8 @@ ManagementAgentImpl::ManagementAgentImpl() :
interval(10), extThread(false), pipeHandle(0), notifyCallback(0), notifyContext(0),
notifyable(0), inCallback(false),
initialized(false), connected(false), useMapMsg(false), lastFailure("never connected"),
+ topicExchange("qmf.default.topic"), directExchange("qmf.default.direct"),
+ schemaTimestamp(Duration(EPOCH, now())),
clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0),
assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0),
connThreadBody(*this), connThread(connThreadBody),
@@ -296,7 +298,8 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
event.getEventName(),
- event.getMd5Sum());
+ event.getMd5Sum(),
+ ManagementItem::CLASS_KIND_EVENT);
event.mapEncode(values);
map_["_values"] = values;
map_["_timestamp"] = uint64_t(Duration(EPOCH, now()));
@@ -308,7 +311,7 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
headers["qmf.agent"] = name_address;
MapCodec::encode(map_, content);
- connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", key.str());
+ connThreadBody.sendBuffer(content, "", headers, topicExchange, key.str());
}
uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
@@ -404,7 +407,6 @@ void ManagementAgentImpl::retrieveData()
void ManagementAgentImpl::sendHeartbeat()
{
- static const string addr_exchange("qmf.default.topic");
static const string addr_key_base("agent.ind.heartbeat");
Variant::Map map;
@@ -429,13 +431,9 @@ void ManagementAgentImpl::sendHeartbeat()
headers["qmf.opcode"] = "_agent_heartbeat_indication";
headers["qmf.agent"] = name_address;
- map["_values"] = attrMap;
- map["_values"].asMap()["timestamp"] = uint64_t(Duration(EPOCH, now()));
- map["_values"].asMap()["heartbeat_interval"] = interval;
- map["_values"].asMap()["epoch"] = bootSequence;
-
+ getHeartbeatContent(map);
MapCodec::encode(map, content);
- connThreadBody.sendBuffer(content, "", headers, addr_exchange, addr_key.str());
+ connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str());
QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
}
@@ -443,8 +441,6 @@ void ManagementAgentImpl::sendHeartbeat()
void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid,
const string& text, uint32_t code)
{
- static const string addr_exchange("qmf.default.direct");
-
Variant::Map map;
Variant::Map headers;
Variant::Map values;
@@ -459,7 +455,7 @@ void ManagementAgentImpl::sendException(const string& replyToKey, const string&
map["_values"] = values;
MapCodec::encode(map, content);
- connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyToKey);
+ connThreadBody.sendBuffer(content, cid, headers, directExchange, replyToKey);
QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text);
}
@@ -573,7 +569,7 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string&
headers["qmf.opcode"] = "_method_response";
QPID_LOG(trace, "SENT MethodResponse map=" << outMap);
MapCodec::encode(outMap, content);
- connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo);
+ connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo);
}
}
@@ -590,7 +586,6 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid,
headers["method"] = "response";
headers["qmf.opcode"] = "_query_response";
- headers["qmf.content"] = "_data";
headers["qmf.agent"] = name_address;
headers["partial"] = Variant();
@@ -614,80 +609,25 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid,
return;
}
- if (i->second.asString() != "OBJECT") {
- sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported");
- return;
- }
-
- string className;
- string packageName;
-
- /*
- * Handle the _schema_id element, if supplied.
- */
- i = inMap.find("_schema_id");
- if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
- const Variant::Map& schemaIdMap(i->second.asMap());
-
- Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name");
- if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
- className = s_iter->second.asString();
-
- s_iter = schemaIdMap.find("_package_name");
- if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
- packageName = s_iter->second.asString();
- }
-
- /*
- * Unpack the _object_id element of the query if it is present. If it is present, find that one
- * object and return it. If it is not present, send a class-based result.
- */
- i = inMap.find("_object_id");
- if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
- ObjectId objId(i->second.asMap());
-
- ManagementObjectMap::iterator iter = managementObjects.find(objId);
- if (iter != managementObjects.end()) {
- ManagementObject* object = iter->second;
-
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
-
- object->mapEncodeValues(values, true, true); // write both stats and properties
- objId.mapEncode(oidMap);
- map_["_values"] = values;
- map_["_object_id"] = oidMap;
- object->writeTimestamps(map_);
- map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
- object->getClassName(),
- object->getMd5Sum());
-
- list_.push_back(map_);
- headers.erase("partial");
-
- ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo);
- return;
- }
- } else {
- for (ManagementObjectMap::iterator iter = managementObjects.begin();
- iter != managementObjects.end();
- iter++) {
- ManagementObject* object = iter->second;
- if (object->getClassName() == className &&
- (packageName.empty() || object->getPackageName() == packageName)) {
+ if (i->second.asString() == "OBJECT") {
+ headers["qmf.content"] = "_data";
+ /*
+ * Unpack the _object_id element of the query if it is present. If it is present, find that one
+ * object and return it. If it is not present, send a class-based result.
+ */
+ i = inMap.find("_object_id");
+ if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
+ ObjectId objId(i->second.asMap());
- // @todo support multiple object reply per message
- values.clear();
- list_.clear();
- oidMap.clear();
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter != managementObjects.end()) {
+ ManagementObject* object = iter->second;
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
object->mapEncodeValues(values, true, true); // write both stats and properties
- iter->first.mapEncode(oidMap);
+ objId.mapEncode(oidMap);
map_["_values"] = values;
map_["_object_id"] = oidMap;
object->writeTimestamps(map_);
@@ -695,26 +635,101 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid,
object->getClassName(),
object->getMd5Sum());
list_.push_back(map_);
+ headers.erase("partial");
ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
+ connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo);
+ return;
+ }
+ } else { // match using schema_id, if supplied
+
+ string className;
+ string packageName;
+
+ i = inMap.find("_schema_id");
+ if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
+ const Variant::Map& schemaIdMap(i->second.asMap());
+
+ Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name");
+ if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
+ className = s_iter->second.asString();
+
+ s_iter = schemaIdMap.find("_package_name");
+ if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
+ packageName = s_iter->second.asString();
+
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second;
+ if (object->getClassName() == className &&
+ (packageName.empty() || object->getPackageName() == packageName)) {
+
+ // @todo support multiple object reply per message
+ values.clear();
+ list_.clear();
+ oidMap.clear();
+
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ iter->first.mapEncode(oidMap);
+ map_["_values"] = values;
+ map_["_object_id"] = oidMap;
+ object->writeTimestamps(map_);
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ list_.push_back(map_);
+
+ ListCodec::encode(list_, content);
+ connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
+ }
+ }
}
}
- }
- // Send empty "non-partial" message to indicate CommandComplete
- list_.clear();
- headers.erase("partial");
- ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo);
+ // Send empty "non-partial" message to indicate CommandComplete
+ list_.clear();
+ headers.erase("partial");
+ ListCodec::encode(list_, content);
+ connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo);
+
+ } else if (i->second.asString() == "SCHEMA_ID") {
+ headers["qmf.content"] = "_schema_id";
+ /**
+ * @todo - support for a predicate. For now, send a list of all known schema class keys.
+ */
+ for (PackageMap::iterator pIter = packages.begin();
+ pIter != packages.end(); pIter++) {
+ for (ClassMap::iterator cIter = pIter->second.begin();
+ cIter != pIter->second.end(); cIter++) {
+
+ list_.push_back(mapEncodeSchemaId( pIter->first,
+ cIter->first.name,
+ cIter->first.hash,
+ cIter->second.kind ));
+ }
+ }
+
+ headers.erase("partial");
+ ListCodec::encode(list_, content);
+ connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (SchemaId) to=" << replyTo);
+
+ } else {
+ // Unknown query target
+ sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+ }
}
void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo)
{
QPID_LOG(trace, "RCVD AgentLocateRequest");
- static const string addr_exchange("qmf.default.direct");
Variant::Map map;
Variant::Map headers;
@@ -724,13 +739,9 @@ void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid,
headers["qmf.opcode"] = "_agent_locate_response";
headers["qmf.agent"] = name_address;
- map["_values"] = attrMap;
- map["_values"].asMap()["timestamp"] = uint64_t(Duration(EPOCH, now()));
- map["_values"].asMap()["heartbeat_interval"] = interval;
- map["_values"].asMap()["epoch"] = bootSequence;
-
+ getHeartbeatContent(map);
MapCodec::encode(map, content);
- connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyTo);
+ connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo);
QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
@@ -822,13 +833,19 @@ void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq
Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname,
const string& cname,
- const uint8_t *md5Sum)
+ const uint8_t *md5Sum,
+ uint8_t type)
{
Variant::Map map_;
map_["_package_name"] = pname;
map_["_class_name"] = cname;
map_["_hash"] = types::Uuid(md5Sum);
+ if (type == ManagementItem::CLASS_KIND_EVENT)
+ map_["_type"] = "_event";
+ else
+ map_["_type"] = "_data";
+
return map_;
}
@@ -901,6 +918,8 @@ void ManagementAgentImpl::addClassLocal(uint8_t classKind,
// No such class found, create a new class with local information.
cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall, classKind)));
+ schemaTimestamp = Duration(EPOCH, now());
+ QPID_LOG(trace, "Updated schema timestamp, now=" << uint64_t(schemaTimestamp));
}
void ManagementAgentImpl::encodePackageIndication(Buffer& buf,
@@ -1014,7 +1033,7 @@ void ManagementAgentImpl::periodicProcessing()
headers["qmf.content"] = "_data";
headers["qmf.agent"] = name_address;
- connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", "agent.ind.data", "amqp/list");
+ connThreadBody.sendBuffer(content, "", headers, topicExchange, "agent.ind.data", "amqp/list");
QPID_LOG(trace, "SENT DataIndication");
}
}
@@ -1031,6 +1050,16 @@ void ManagementAgentImpl::periodicProcessing()
sendHeartbeat();
}
+
+void ManagementAgentImpl::getHeartbeatContent(qpid::types::Variant::Map& map)
+{
+ map["_values"] = attrMap;
+ map["_values"].asMap()["timestamp"] = uint64_t(Duration(EPOCH, now()));
+ map["_values"].asMap()["heartbeat_interval"] = interval;
+ map["_values"].asMap()["epoch"] = bootSequence;
+ map["_values"].asMap()["schema_timestamp"] = uint64_t(schemaTimestamp);
+}
+
void ManagementAgentImpl::ConnectionThread::run()
{
static const int delayMin(1);
@@ -1055,9 +1084,9 @@ void ManagementAgentImpl::ConnectionThread::run()
arg::exclusive=true);
session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(),
arg::bindingKey=queueName.str());
- session.exchangeBind(arg::exchange="qmf.default.direct", arg::queue=queueName.str(),
+ session.exchangeBind(arg::exchange=agent.directExchange, arg::queue=queueName.str(),
arg::bindingKey=agent.name_address);
- session.exchangeBind(arg::exchange="qmf.default.topic", arg::queue=queueName.str(),
+ session.exchangeBind(arg::exchange=agent.topicExchange, arg::queue=queueName.str(),
arg::bindingKey="console.#");
subscriptions->subscribe(agent, queueName.str(), dest);
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h
index 4a58807e98..32cbbd7e08 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -32,6 +32,7 @@
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/PipeHandle.h"
+#include "qpid/sys/Time.h"
#include "qpid/framing/Uuid.h"
#include <iostream>
#include <sstream>
@@ -166,6 +167,9 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
bool connected;
bool useMapMsg;
std::string lastFailure;
+ std::string topicExchange;
+ std::string directExchange;
+ qpid::sys::Duration schemaTimestamp;
bool clientWasAdded;
uint32_t requestedBrokerBank;
@@ -258,7 +262,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
qpid::types::Variant::Map mapEncodeSchemaId(const std::string& pname,
const std::string& cname,
- const uint8_t *md5Sum);
+ const uint8_t *md5Sum,
+ uint8_t type=ManagementItem::CLASS_KIND_TABLE);
bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
void sendHeartbeat();
void sendException(const std::string& replyToKey, const std::string& cid,
@@ -272,6 +277,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& replyTo);
void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& replyTo);
void handleConsoleAddedIndication();
+ void getHeartbeatContent (qpid::types::Variant::Map& map);
};
}}