summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-11-20 14:51:45 +0000
committerTed Ross <tross@apache.org>2008-11-20 14:51:45 +0000
commitbe6a6d0014e60226607fdc8d7e5d67bd1eb13b4f (patch)
treef24d5b3257ccc1ae8902fa9af5d7ed1d1612bf30
parent57de045575c5004f04f84450c2296a3093dc8b2d (diff)
downloadqpid-python-be6a6d0014e60226607fdc8d7e5d67bd1eb13b4f.tar.gz
QPID-1476 - routing keys used for updates can't be used to discriminate by agent
- Fixed routing keys in agents and binding keys in consoles - Added some additional debug output for ManagementAgentImpl - Minor cleanup in the connection close path for ManagementAgentImpl git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@719245 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.cpp25
-rw-r--r--cpp/src/qpid/management/ManagementBroker.cpp12
-rw-r--r--python/qpid/qmfconsole.py6
-rw-r--r--ruby/lib/qpid/qmf.rb6
4 files changed, 30 insertions, 19 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 08aefa59d9..173785a671 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -192,6 +192,10 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
uint32_t outLen;
uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
+ stringstream key;
+
+ key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." <<
+ event.getPackageName() << "." << event.getEventName();
encodeHeader(outBuffer, 'e');
outBuffer.putShortString(event.getPackageName());
@@ -202,8 +206,7 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
event.encode(outBuffer);
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
- connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management",
- "console.event." + event.getPackageName() + "." + event.getEventName());
+ connThreadBody.sendBuffer(outBuffer, outLen, "qpid.management", key.str());
}
uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
@@ -684,6 +687,10 @@ void ManagementAgentImpl::periodicProcessing()
moveNewObjectsLH();
+ if (debugLevel >= DEBUG_PUBLISH) {
+ cout << "Objects managed: " << managementObjects.size() << endl;
+ }
+
if (clientWasAdded) {
clientWasAdded = false;
for (ManagementObjectMap::iterator iter = managementObjects.begin();
@@ -752,12 +759,16 @@ void ManagementAgentImpl::periodicProcessing()
if (contentSize > 0) {
msgBuffer.reset();
stringstream key;
- key << "console.obj." << baseObject->getPackageName() << "." << baseObject->getClassName() << "." <<
- assignedBrokerBank << "." << assignedAgentBank;
+ key << "console.obj." << assignedBrokerBank << "." << assignedAgentBank << "." <<
+ baseObject->getPackageName() << "." << baseObject->getClassName();
connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str());
}
}
+ if (debugLevel >= DEBUG_PUBLISH && !deleteList.empty()) {
+ cout << "Deleting " << deleteList.size() << " objects" << endl;
+ }
+
// Delete flagged objects
for (list<pair<ObjectId, ManagementObject*> >::reverse_iterator iter = deleteList.rbegin();
iter != deleteList.rend();
@@ -798,6 +809,8 @@ void ManagementAgentImpl::ConnectionThread::run()
cout << " Connection established" << endl;
{
Mutex::ScopedLock _lock(connLock);
+ if (shutdown)
+ return;
operational = true;
agent.startProtocol();
try {
@@ -809,12 +822,12 @@ void ManagementAgentImpl::ConnectionThread::run()
cout << "QMF Agent connection has been lost" << endl;
operational = false;
+ agent.connected = false;
}
delay = delayMin;
+ connection.close();
delete subscriptions;
subscriptions = 0;
- session.close();
- connection.close();
}
} catch (exception &e) {
if (delay < delayMax)
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp
index 23ef8d9e6a..48b73546b3 100644
--- a/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/cpp/src/qpid/management/ManagementBroker.cpp
@@ -237,7 +237,7 @@ void ManagementBroker::raiseEvent(const ManagementEvent& event, severity_t sever
outLen = MA_BUFFER_SIZE - outBuffer.available();
outBuffer.reset();
sendBuffer(outBuffer, outLen, mExchange,
- "console.event." + event.getPackageName() + "." + event.getEventName());
+ "console.event.1.0." + event.getPackageName() + "." + event.getEventName());
}
ManagementBroker::Periodic::Periodic (ManagementBroker& _broker, uint32_t _seconds)
@@ -357,13 +357,11 @@ void ManagementBroker::periodicProcessing (void)
moveNewObjectsLH();
- if (clientWasAdded)
- {
+ if (clientWasAdded) {
clientWasAdded = false;
for (ManagementObjectMap::iterator iter = managementObjects.begin ();
iter != managementObjects.end ();
- iter++)
- {
+ iter++) {
ManagementObject* object = iter->second;
object->setAllChanged ();
}
@@ -386,7 +384,7 @@ void ManagementBroker::periodicProcessing (void)
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
- routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName() + "1.0";
+ routingKey = "console.obj.1.0." + object->getPackageName() + "." + object->getClassName();
sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
@@ -398,7 +396,7 @@ void ManagementBroker::periodicProcessing (void)
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
- routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName() + "1.0";
+ routingKey = "console.obj.1.0." + object->getPackageName() + "." + object->getClassName();
sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
diff --git a/python/qpid/qmfconsole.py b/python/qpid/qmfconsole.py
index aa4b149601..bdd93e6f94 100644
--- a/python/qpid/qmfconsole.py
+++ b/python/qpid/qmfconsole.py
@@ -224,7 +224,7 @@ class Session:
raise Exception("userBindings option not set for Session")
for broker in self.brokers:
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
- binding_key="console.obj.%s.#" % packageName)
+ binding_key="console.obj.*.*.%s.#" % packageName)
def bindClass(self, classKey):
""" """
@@ -233,7 +233,7 @@ class Session:
pname, cname, hash = classKey
for broker in self.brokers:
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
- binding_key="console.obj.%s.%s.#" % (pname, cname))
+ binding_key="console.obj.*.*.%s.%s.#" % (pname, cname))
def getAgents(self, broker=None):
""" Get a list of currently known agents """
@@ -370,7 +370,7 @@ class Session:
if self.rcvObjects and not self.userBindings:
keyList.append("console.obj.#")
else:
- keyList.append("console.obj.org.apache.qpid.broker.agent")
+ keyList.append("console.obj.*.*.org.apache.qpid.broker.agent")
if self.rcvEvents:
keyList.append("console.event.#")
if self.rcvHeartbeats:
diff --git a/ruby/lib/qpid/qmf.rb b/ruby/lib/qpid/qmf.rb
index 2b7ab58e96..378d4068be 100644
--- a/ruby/lib/qpid/qmf.rb
+++ b/ruby/lib/qpid/qmf.rb
@@ -217,7 +217,7 @@ module Qpid::Qmf
@brokers.each do |broker|
args = { :exchange => "qpid.management",
:queue => broker.topicName,
- :binding_key => "console.obj.#{package_name}.#" }
+ :binding_key => "console.obj.*.*.#{package_name}.#" }
broker.amqpSession.exchange_bind(args)
end
end
@@ -230,7 +230,7 @@ module Qpid::Qmf
@brokers.each do |broker|
args = { :exchange => "qpid.management",
:queue => broker.topicName,
- :binding_key => "console.obj.#{pname}.#{cname}.#" }
+ :binding_key => "console.obj.*.*.#{pname}.#{cname}.#" }
broker.amqpSession.exchange_bind(args)
end
end
@@ -637,7 +637,7 @@ module Qpid::Qmf
if @rcv_objects && ! @user_bindings
key_list << "console.obj.#"
else
- key_list << "console.obj.org.apache.qpid.broker.agent"
+ key_list << "console.obj.*.*.org.apache.qpid.broker.agent"
end
key_list << "console.event.#" if @rcv_events
key_list << "console.heartbeat.#" if @rcv_heartbeats