summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-11-13 04:15:15 +0000
committerTed Ross <tross@apache.org>2008-11-13 04:15:15 +0000
commitb15caaa061091efcecfd1e8704e6f2c6bde4e03e (patch)
treef523e1396f2994b97fb98b0db91a266e82e67d99 /qpid
parent4a32a1d626f30b7679eaecdf98d0c3330b4d4c08 (diff)
downloadqpid-python-b15caaa061091efcecfd1e8704e6f2c6bde4e03e.tar.gz
Updated qmf-agent API to allow user to specify uid, password, mechanism, and protocol.
Fixed qmf-console bug related to routing keys of object messages. Pass the binding key into the management agent to allow for selective broadcast of object data. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@713631 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgent.h22
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp45
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.h27
-rw-r--r--qpid/cpp/src/qpid/management/ManagementBroker.cpp23
-rw-r--r--qpid/cpp/src/qpid/management/ManagementBroker.h22
-rw-r--r--qpid/cpp/src/qpid/management/ManagementExchange.cpp4
-rw-r--r--qpid/python/qpid/qmfconsole.py13
-rw-r--r--qpid/ruby/lib/qpid/qmf.rb4
8 files changed, 89 insertions, 71 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgent.h b/qpid/cpp/src/qpid/agent/ManagementAgent.h
index 03baa10aa2..296bb17e62 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/agent/ManagementAgent.h
@@ -81,24 +81,28 @@ class ManagementAgent
// storeFile - File where this process has read and write access. This
// file shall be used to store persistent state.
//
- virtual void init (std::string brokerHost = "localhost",
- uint16_t brokerPort = 5672,
- uint16_t intervalSeconds = 10,
- bool useExternalThread = false,
- std::string storeFile = "") = 0;
+ virtual void init(const std::string& brokerHost = "localhost",
+ uint16_t brokerPort = 5672,
+ uint16_t intervalSeconds = 10,
+ bool useExternalThread = false,
+ const std::string& storeFile = "",
+ const std::string& uid = "guest",
+ const std::string& pwd = "guest",
+ const std::string& mech = "PLAIN",
+ const std::string& proto = "tcp") = 0;
// Register a schema with the management agent. This is normally called by the
// package initializer generated by the management code generator.
//
virtual void
- registerClass(std::string& packageName,
- std::string& className,
+ registerClass(const std::string& packageName,
+ const std::string& className,
uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall) = 0;
virtual void
- registerEvent(std::string& packageName,
- std::string& eventName,
+ registerEvent(const std::string& packageName,
+ const std::string& eventName,
uint8_t* md5Sum,
management::ManagementEvent::writeSchemaCall_t schemaCall) = 0;
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 06acf6b0e3..3f863d41d7 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -80,7 +80,7 @@ const string ManagementAgentImpl::storeMagicNumber("MA02");
ManagementAgentImpl::ManagementAgentImpl() :
extThread(false), writeFd(-1), readFd(-1),
- connected(false), lastFailure("never connected"),
+ initialized(false), connected(false), lastFailure("never connected"),
clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0),
assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0), debugLevel(0),
connThreadBody(*this), connThread(connThreadBody),
@@ -102,18 +102,26 @@ ManagementAgentImpl::~ManagementAgentImpl()
}
}
-void ManagementAgentImpl::init(string brokerHost,
- uint16_t brokerPort,
- uint16_t intervalSeconds,
- bool useExternalThread,
- string _storeFile)
+void ManagementAgentImpl::init(const string& brokerHost,
+ uint16_t brokerPort,
+ uint16_t intervalSeconds,
+ bool useExternalThread,
+ const string& _storeFile,
+ const string& uid,
+ const string& pwd,
+ const string& mech,
+ const string& proto)
{
interval = intervalSeconds;
extThread = useExternalThread;
storeFile = _storeFile;
nextObjectId = 1;
- host = brokerHost;
- port = brokerPort;
+ connectionSettings.protocol = proto;
+ connectionSettings.host = brokerHost;
+ connectionSettings.port = brokerPort;
+ connectionSettings.username = uid;
+ connectionSettings.password = pwd;
+ connectionSettings.mechanism = mech;
if (debugLevel)
cout << "QMF Agent Initialized: broker=" << brokerHost << ":" << brokerPort <<
@@ -139,10 +147,12 @@ void ManagementAgentImpl::init(string brokerHost,
if ((bootSequence & 0xF000) != 0)
bootSequence = 1;
storeData(true);
+
+ initialized = true;
}
-void ManagementAgentImpl::registerClass(string& packageName,
- string& className,
+void ManagementAgentImpl::registerClass(const string& packageName,
+ const string& className,
uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall)
{
@@ -151,8 +161,8 @@ void ManagementAgentImpl::registerClass(string& packageName,
addClassLocal(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall);
}
-void ManagementAgentImpl::registerEvent(string& packageName,
- string& eventName,
+void ManagementAgentImpl::registerEvent(const string& packageName,
+ const string& eventName,
uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall)
{
@@ -605,7 +615,6 @@ void ManagementAgentImpl::periodicProcessing()
Mutex::ScopedLock lock(agentLock);
char msgChars[BUFSIZE];
uint32_t contentSize;
- string routingKey;
list<pair<ObjectId, ManagementObject*> > deleteList;
if (!connected)
@@ -692,8 +701,10 @@ void ManagementAgentImpl::periodicProcessing()
contentSize = BUFSIZE - msgBuffer.available();
if (contentSize > 0) {
msgBuffer.reset();
- routingKey = "console.obj." + baseObject->getPackageName() + "." + baseObject->getClassName();
- connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", routingKey);
+ stringstream key;
+ key << "console.obj." << baseObject->getPackageName() << "." << baseObject->getClassName() << "." <<
+ assignedBrokerBank << "." << assignedAgentBank;
+ connThreadBody.sendBuffer(msgBuffer, contentSize, "qpid.management", key.str());
}
}
@@ -721,10 +732,10 @@ void ManagementAgentImpl::ConnectionThread::run()
while (true) {
try {
- if (!agent.host.empty()) {
+ if (agent.initialized) {
if (agent.debugLevel)
cout << "QMF Agent attempting to connect to the broker..." << endl;
- connection.open(agent.host.c_str(), agent.port);
+ connection.open(agent.connectionSettings);
session = connection.newSession(queueName.str());
subscriptions = new client::SubscriptionManager(session);
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
index 5b6437944f..4ba9d7262a 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -22,6 +22,7 @@
#include "ManagementAgent.h"
#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
#include "qpid/client/SubscriptionManager.h"
#include "qpid/client/Session.h"
#include "qpid/client/AsyncSession.h"
@@ -49,19 +50,23 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
// Methods from ManagementAgent
//
int getMaxThreads() { return 1; }
- void init(std::string brokerHost = "localhost",
- uint16_t brokerPort = 5672,
- uint16_t intervalSeconds = 10,
- bool useExternalThread = false,
- std::string storeFile = "");
+ void init(const std::string& brokerHost = "localhost",
+ uint16_t brokerPort = 5672,
+ uint16_t intervalSeconds = 10,
+ bool useExternalThread = false,
+ const std::string& storeFile = "",
+ const std::string& uid = "guest",
+ const std::string& pwd = "guest",
+ const std::string& mech = "PLAIN",
+ const std::string& proto = "tcp");
bool isConnected() { return connected; }
std::string& getLastFailure() { return lastFailure; }
- void registerClass(std::string& packageName,
- std::string& className,
+ void registerClass(const std::string& packageName,
+ const std::string& className,
uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall);
- void registerEvent(std::string& packageName,
- std::string& eventName,
+ void registerEvent(const std::string& packageName,
+ const std::string& eventName,
uint8_t* md5Sum,
management::ManagementObject::writeSchemaCall_t schemaCall);
ObjectId addObject(management::ManagementObject* objectPtr, uint64_t persistId = 0);
@@ -130,8 +135,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
sys::Mutex agentLock;
sys::Mutex addLock;
framing::Uuid systemId;
- std::string host;
- uint16_t port;
+ client::ConnectionSettings connectionSettings;
+ bool initialized;
bool connected;
std::string lastFailure;
diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.cpp b/qpid/cpp/src/qpid/management/ManagementBroker.cpp
index e2c735e660..23ef8d9e6a 100644
--- a/qpid/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementBroker.cpp
@@ -113,7 +113,7 @@ ManagementBroker::~ManagementBroker ()
}
}
-void ManagementBroker::configure(string _dataDir, uint16_t _interval,
+void ManagementBroker::configure(const string& _dataDir, uint16_t _interval,
qpid::broker::Broker* _broker, int _threads)
{
dataDir = _dataDir;
@@ -178,8 +178,8 @@ void ManagementBroker::setExchange (qpid::broker::Exchange::shared_ptr _mexchang
dExchange = _dexchange;
}
-void ManagementBroker::registerClass (string& packageName,
- string& className,
+void ManagementBroker::registerClass (const string& packageName,
+ const string& className,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall)
{
@@ -188,8 +188,8 @@ void ManagementBroker::registerClass (string& packageName,
addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall);
}
-void ManagementBroker::registerEvent (string& packageName,
- string& eventName,
+void ManagementBroker::registerEvent (const string& packageName,
+ const string& eventName,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall)
{
@@ -251,7 +251,7 @@ void ManagementBroker::Periodic::fire ()
broker.periodicProcessing ();
}
-void ManagementBroker::clientAdded (void)
+void ManagementBroker::clientAdded (const std::string& /*routingKey*/)
{
Mutex::ScopedLock lock (userLock);
@@ -386,7 +386,7 @@ void ManagementBroker::periodicProcessing (void)
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
- routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName ();
+ routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName() + "1.0";
sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
@@ -398,7 +398,7 @@ void ManagementBroker::periodicProcessing (void)
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
- routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName ();
+ routingKey = "console.obj." + object->getPackageName() + "." + object->getClassName() + "1.0";
sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
}
@@ -721,7 +721,7 @@ bool ManagementBroker::bankInUse (uint32_t bank)
for (RemoteAgentMap::iterator aIter = remoteAgents.begin();
aIter != remoteAgents.end();
aIter++)
- if (aIter->second->objIdBank == bank)
+ if (aIter->second->brokerBank == bank)
return true;
return false;
}
@@ -796,7 +796,8 @@ void ManagementBroker::handleAttachRequestLH (Buffer& inBuffer, string replyToKe
assignedBank = assignBankLH(requestedAgentBank);
RemoteAgent* agent = new RemoteAgent;
- agent->objIdBank = assignedBank;
+ agent->brokerBank = brokerBank;
+ agent->agentBank = assignedBank;
agent->routingKey = replyToKey;
agent->connectionRef = connectionRef;
agent->mgmtObject = new _qmf::Agent (this, agent);
@@ -1006,7 +1007,7 @@ ManagementBroker::PackageMap::iterator ManagementBroker::findOrAddPackageLH(stri
void ManagementBroker::addClassLH(uint8_t kind,
PackageMap::iterator pIter,
- string& className,
+ const string& className,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall)
{
diff --git a/qpid/cpp/src/qpid/management/ManagementBroker.h b/qpid/cpp/src/qpid/management/ManagementBroker.h
index 3564d462df..77f4a53836 100644
--- a/qpid/cpp/src/qpid/management/ManagementBroker.h
+++ b/qpid/cpp/src/qpid/management/ManagementBroker.h
@@ -47,30 +47,33 @@ public:
ManagementBroker ();
virtual ~ManagementBroker ();
- void configure (std::string dataDir, uint16_t interval, qpid::broker::Broker* broker, int threadPoolSize);
+ void configure (const std::string& dataDir, uint16_t interval,
+ qpid::broker::Broker* broker, int threadPoolSize);
void setInterval (uint16_t _interval) { interval = _interval; }
void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange,
qpid::broker::Exchange::shared_ptr directExchange);
int getMaxThreads () { return threadPoolSize; }
- void registerClass (std::string& packageName,
- std::string& className,
+ void registerClass (const std::string& packageName,
+ const std::string& className,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
- void registerEvent (std::string& packageName,
- std::string& eventName,
+ void registerEvent (const std::string& packageName,
+ const std::string& eventName,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
ObjectId addObject (ManagementObject* object,
uint64_t persistId = 0);
void raiseEvent(const ManagementEvent& event, severity_t severity = SEV_DEFAULT);
- void clientAdded ();
+ void clientAdded (const std::string& routingKey);
bool dispatchCommand (qpid::broker::Deliverable& msg,
const std::string& routingKey,
const framing::FieldTable* args);
const framing::Uuid& getUuid() const { return uuid; }
// Stubs for remote management agent calls
- void init (std::string, uint16_t, uint16_t, bool, std::string) { assert(0); }
+ void init (const std::string&, uint16_t, uint16_t, bool,
+ const std::string&, const std::string&, const std::string&,
+ const std::string&, const std::string&) { assert(0); }
uint32_t pollCallbacks (uint32_t) { assert(0); return 0; }
int getSignalFd () { assert(0); return -1; }
@@ -91,7 +94,8 @@ private:
//
struct RemoteAgent : public Manageable
{
- uint32_t objIdBank;
+ uint32_t brokerBank;
+ uint32_t agentBank;
std::string routingKey;
ObjectId connectionRef;
qmf::org::apache::qpid::broker::Agent* mgmtObject;
@@ -195,7 +199,7 @@ private:
PackageMap::iterator findOrAddPackageLH(std::string name);
void addClassLH(uint8_t kind,
PackageMap::iterator pIter,
- std::string& className,
+ const std::string& className,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
void encodePackageIndication (framing::Buffer& buf,
diff --git a/qpid/cpp/src/qpid/management/ManagementExchange.cpp b/qpid/cpp/src/qpid/management/ManagementExchange.cpp
index 4ccf8e68c9..4dcafbfcdd 100644
--- a/qpid/cpp/src/qpid/management/ManagementExchange.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementExchange.cpp
@@ -56,8 +56,8 @@ bool ManagementExchange::bind (Queue::shared_ptr queue,
const string& routingKey,
const qpid::framing::FieldTable* args)
{
- managementAgent->clientAdded ();
- return TopicExchange::bind (queue, routingKey, args);
+ managementAgent->clientAdded(routingKey);
+ return TopicExchange::bind(queue, routingKey, args);
}
void ManagementExchange::setManagmentAgent (ManagementBroker* agent)
diff --git a/qpid/python/qpid/qmfconsole.py b/qpid/python/qpid/qmfconsole.py
index 7d7aeee44f..06517d0aaf 100644
--- a/qpid/python/qpid/qmfconsole.py
+++ b/qpid/python/qpid/qmfconsole.py
@@ -224,7 +224,7 @@ class Session:
raise Exception("userBindings option not set for Session")
for broker in self.brokers:
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
- binding_key="console.obj.%s" % packageName)
+ binding_key="console.obj.%s.#" % packageName)
def bindClass(self, classKey):
""" """
@@ -233,7 +233,7 @@ class Session:
pname, cname, hash = classKey
for broker in self.brokers:
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
- binding_key="console.obj.%s.%s" % (pname, cname))
+ binding_key="console.obj.%s.%s.#" % (pname, cname))
def getAgents(self, broker=None):
""" Get a list of currently known agents """
@@ -374,7 +374,7 @@ class Session:
if self.rcvEvents:
keyList.append("console.event.#")
if self.rcvHeartbeats:
- keyList.append("console.heartbeat")
+ keyList.append("console.heartbeat.#")
return keyList
def _handleBrokerConnect(self, broker):
@@ -692,13 +692,6 @@ class Package:
def __init__(self, name):
self.name = name
-class ClassKey:
- """ """
- def __init__(self, package, className, hash):
- self.package = package
- self.className = className
- self.hash = hash
-
class SchemaClass:
""" """
CLASS_KIND_TABLE = 1
diff --git a/qpid/ruby/lib/qpid/qmf.rb b/qpid/ruby/lib/qpid/qmf.rb
index e7dfae3e7e..2b7ab58e96 100644
--- a/qpid/ruby/lib/qpid/qmf.rb
+++ b/qpid/ruby/lib/qpid/qmf.rb
@@ -217,7 +217,7 @@ module Qpid::Qmf
@brokers.each do |broker|
args = { :exchange => "qpid.management",
:queue => broker.topicName,
- :binding_key => "console.obj.#{package_name}" }
+ :binding_key => "console.obj.#{package_name}.#" }
broker.amqpSession.exchange_bind(args)
end
end
@@ -230,7 +230,7 @@ module Qpid::Qmf
@brokers.each do |broker|
args = { :exchange => "qpid.management",
:queue => broker.topicName,
- :binding_key => "console.obj.#{pname}.#{cname}" }
+ :binding_key => "console.obj.#{pname}.#{cname}.#" }
broker.amqpSession.exchange_bind(args)
end
end