summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-05-18 20:34:34 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-05-18 20:34:34 +0000
commitb178a0dce4a6cffbdc6c1d2b56eb6d828775dd0c (patch)
tree0f2da6316a6dfb0babcec91cacc43e2110de7b99
parentc43784b1ab25e5214ca801ae2b3f92ce66958f51 (diff)
downloadqpid-python-b178a0dce4a6cffbdc6c1d2b56eb6d828775dd0c.tar.gz
QMF: allow consoles to filter agent heartbeats based on agent identification.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@945871 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp23
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp15
-rw-r--r--qpid/extras/qmf/src/py/qmf/console.py28
3 files changed, 60 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 890ecd2ca2..5c2c6c54e6 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -50,6 +50,9 @@ namespace {
bool disabled = false;
ManagementAgent* agent = 0;
int refCount = 0;
+
+ const string defaultVendorName("vendor");
+ const string defaultProductName("product");
}
ManagementAgent::Singleton::Singleton(bool disableManagement)
@@ -176,7 +179,7 @@ void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings,
storeData(true);
if (attrMap.empty())
- setName("vendor", "product");
+ setName(defaultVendorName, defaultProductName);
initialized = true;
}
@@ -361,11 +364,25 @@ void ManagementAgentImpl::retrieveData()
void ManagementAgentImpl::sendHeartbeat()
{
static const string addr_exchange("qmf.default.topic");
- static const string addr_key("agent.ind.heartbeat");
+ static const string addr_key_base("agent.ind.heartbeat");
Variant::Map map;
Variant::Map headers;
string content;
+ std::stringstream addr_key;
+
+ addr_key << addr_key_base;
+
+ // append .<vendor>.<product> to address key if present.
+ Variant::Map::const_iterator v;
+ if ((v = attrMap.find("_vendor")) != attrMap.end() &&
+ v->second.getString() != defaultVendorName) {
+ addr_key << "." << v->second.getString();
+ if ((v = attrMap.find("_product")) != attrMap.end() &&
+ v->second.getString() != defaultProductName) {
+ addr_key << "." << v->second.getString();
+ }
+ }
headers["method"] = "indication";
headers["qmf.opcode"] = "_agent_heartbeat_indication";
@@ -377,7 +394,7 @@ void ManagementAgentImpl::sendHeartbeat()
map["_values"].asMap()["epoch"] = bootSequence;
MapCodec::encode(map, content);
- connThreadBody.sendBuffer(content, "", headers, addr_exchange, addr_key);
+ connThreadBody.sendBuffer(content, "", headers, addr_exchange, addr_key.str());
QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 919fcc22f7..92f9d799f9 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -801,7 +801,18 @@ void ManagementAgent::periodicProcessing (void)
}
if (qmf2Support) {
- static const string addr_key("agent.ind.heartbeat");
+ std::stringstream addr_key;
+
+ addr_key << "agent.ind.heartbeat";
+
+ // append .<vendor>.<product> to address key if present.
+ Variant::Map::const_iterator v;
+ if ((v = attrMap.find("_vendor")) != attrMap.end()){
+ addr_key << "." << v->second.getString();
+ if ((v = attrMap.find("_product")) != attrMap.end()) {
+ addr_key << "." << v->second.getString();
+ }
+ }
Variant::Map map;
Variant::Map headers;
@@ -817,7 +828,7 @@ void ManagementAgent::periodicProcessing (void)
string content;
MapCodec::encode(map, content);
- sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key);
+ sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str());
QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
}
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py
index 2e1f98a161..fc301fe296 100644
--- a/qpid/extras/qmf/src/py/qmf/console.py
+++ b/qpid/extras/qmf/src/py/qmf/console.py
@@ -574,6 +574,7 @@ class Session:
self.rcvHeartbeats = False
self.v1BindingKeyList, self.v2BindingKeyList = self._bindingKeys()
self.manageConnections = manageConnections
+ self.agent_filter = [] # (vendor, product, instance)
if self.userBindings and not self.rcvObjects:
raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
@@ -802,15 +803,40 @@ class Session:
""" """
pass
+ def addAgentFilter(self, vendor, product=None):
+ """ Listen for heartbeat messages only for those agent(s) that match the
+ vendor and, optionally, the product strings.
+ """
+ key = "agent.ind.heartbeat." + vendor
+ if product is not None:
+ key += "." + product
+ key += ".#"
+
+ if key not in self.v2BindingKeyList:
+ self.v2BindingKeyList.append(key)
+ self.agent_filter.append((vendor, product, None))
+
+ # be sure we don't ever filter the local broker
+ local_broker_key = "agent.ind.heartbeat.apache.org.qpidd"
+ if local_broker_key not in self.v2BindingKeyList:
+ self.v2BindingKeyList.append(local_broker_key)
+
+ # remove the wildcard key if present
+ try:
+ self.v2BindingKeyList.remove("agent.ind.heartbeat.#")
+ except:
+ pass
def _bindingKeys(self):
+ """ The set of default key bindings."""
v1KeyList = []
v2KeyList = []
v1KeyList.append("schema.#")
v2KeyList.append("agent.ind.heartbeat.#")
if self.rcvObjects and self.rcvEvents and self.rcvHeartbeats and not self.userBindings:
v1KeyList.append("console.#")
- v2KeyList.append("agent.#")
+ v2KeyList.append("agent.ind.data.#")
+ v2KeyList.append("agent.ind.event.#")
else:
if self.rcvObjects and not self.userBindings:
v1KeyList.append("console.obj.#")