diff options
| author | Nuno Santos <nsantos@apache.org> | 2008-04-15 16:12:01 +0000 |
|---|---|---|
| committer | Nuno Santos <nsantos@apache.org> | 2008-04-15 16:12:01 +0000 |
| commit | 0cb55b441ec82124319fd3b154261a07ded82df2 (patch) | |
| tree | 23bff88d64af5486b8c4c58f28f0b9fd6358f3ac /qpid | |
| parent | a106490812ea883620e84f50b4ce001d375e81f7 (diff) | |
| download | qpid-python-0cb55b441ec82124319fd3b154261a07ded82df2.tar.gz | |
QPID-921: applied qpid-patch36.diff on behalf of Ted Ross
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@648308 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/System.cpp | 40 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/System.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Vhost.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 45 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 8 | ||||
| -rwxr-xr-x | qpid/python/commands/qpid-config | 10 | ||||
| -rwxr-xr-x | qpid/python/commands/qpid-route | 3 | ||||
| -rw-r--r-- | qpid/python/qpid/management.py | 8 | ||||
| -rw-r--r-- | qpid/python/qpid/managementdata.py | 33 | ||||
| -rw-r--r-- | qpid/specs/management-schema.xml | 2 |
12 files changed, 116 insertions, 44 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index b9268db9e5..689fb0687c 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -136,7 +136,7 @@ Broker::Broker(const Broker::Options& conf) : managementAgent->setInterval (conf.mgmtPubInterval); qpid::management::PackageQpid packageInitializer (managementAgent); - System* system = new System (); + System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ()); systemObject = System::shared_ptr (system); mgmtObject = management::Broker::shared_ptr (new management::Broker (this, system, conf.port)); @@ -149,7 +149,7 @@ Broker::Broker(const Broker::Options& conf) : mgmtObject->set_dataDirEnabled (dataDir.isEnabled ()); mgmtObject->set_dataDir (dataDir.getPath ()); - managementAgent->addObject (mgmtObject, 1, 0); + managementAgent->addObject (mgmtObject, 1, 1); // Since there is currently no support for virtual hosts, a placeholder object // representing the implied single virtual host is added here to keep the diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 591e9796d6..b3d8fda53b 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -69,7 +69,7 @@ Queue::Queue(const string& _name, bool _autodelete, if (agent.get () != 0) { mgmtObject = management::Queue::shared_ptr - (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0)); + (new management::Queue (this, parent, _name, _store != 0, _autodelete, _owner != 0)); // Add the object to the management agent only if this queue is not durable. // If it's durable, we will add it later when the queue is assigned a persistenceId. diff --git a/qpid/cpp/src/qpid/broker/System.cpp b/qpid/cpp/src/qpid/broker/System.cpp index 87d5185b97..33a9db46a2 100644 --- a/qpid/cpp/src/qpid/broker/System.cpp +++ b/qpid/cpp/src/qpid/broker/System.cpp @@ -19,19 +19,51 @@ #include "System.h" #include "qpid/management/ManagementAgent.h" +#include "qpid/framing/Uuid.h" #include <sys/utsname.h> +#include <iostream> +#include <fstream> -using namespace qpid::broker; using qpid::management::ManagementAgent; +using namespace qpid::broker; +using namespace std; -System::System () +System::System (string _dataDir) { ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); if (agent.get () != 0) { + framing::Uuid systemId; + + if (_dataDir.empty ()) + { + systemId.generate (); + } + else + { + string filename (_dataDir + "/systemId"); + ifstream inFile (filename.c_str ()); + + if (inFile.good ()) + { + inFile >> systemId; + inFile.close (); + } + else + { + systemId.generate (); + ofstream outFile (filename.c_str ()); + if (outFile.good ()) + { + outFile << systemId << endl; + outFile.close (); + } + } + } + mgmtObject = management::System::shared_ptr - (new management::System (this, "host")); + (new management::System (this, systemId)); struct utsname _uname; if (uname (&_uname) == 0) { @@ -42,7 +74,7 @@ System::System () mgmtObject->set_machine (std::string (_uname.machine)); } - agent->addObject (mgmtObject, 3, 0); + agent->addObject (mgmtObject, 3, 1); } } diff --git a/qpid/cpp/src/qpid/broker/System.h b/qpid/cpp/src/qpid/broker/System.h index a1a710f2b2..0d63bd1b3d 100644 --- a/qpid/cpp/src/qpid/broker/System.h +++ b/qpid/cpp/src/qpid/broker/System.h @@ -23,6 +23,7 @@ #include "qpid/management/Manageable.h" #include "qpid/management/System.h" #include <boost/shared_ptr.hpp> +#include <string> namespace qpid { namespace broker { @@ -37,7 +38,7 @@ class System : public management::Manageable typedef boost::shared_ptr<System> shared_ptr; - System (); + System (std::string _dataDir); management::ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/broker/Vhost.cpp b/qpid/cpp/src/qpid/broker/Vhost.cpp index 06a8c8eca3..c2c7e985b4 100644 --- a/qpid/cpp/src/qpid/broker/Vhost.cpp +++ b/qpid/cpp/src/qpid/broker/Vhost.cpp @@ -33,7 +33,7 @@ Vhost::Vhost (management::Manageable* parentBroker) { mgmtObject = management::Vhost::shared_ptr (new management::Vhost (this, parentBroker, "/")); - agent->addObject (mgmtObject, 2, 0); + agent->addObject (mgmtObject, 2, 1); } } } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index ee0eb27bf6..cace04bef5 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -42,20 +42,24 @@ ManagementAgent::ManagementAgent (string _dataDir, uint16_t _interval) : dataDir (_dataDir), interval (_interval) { timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval))); - nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ())); + localBank = 3; + nextObjectId = 1; nextRemotePrefix = 101; // Get from file or generate and save to file. if (dataDir.empty ()) { uuid.generate (); + bootSequence = 1; QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: " << uuid); } else { - string filename (dataDir + "/brokerId"); - ifstream inFile (filename.c_str ()); + string filename (dataDir + "/brokerId"); + string seqFilename (dataDir + "/bootseq"); + ifstream inFile (filename.c_str ()); + ifstream seqFile (seqFilename.c_str ()); if (inFile.good ()) { @@ -80,6 +84,26 @@ ManagementAgent::ManagementAgent (string _dataDir, uint16_t _interval) : QPID_LOG (warning, "ManagementAgent unable to save broker ID"); } } + + if (seqFile.good ()) + { + seqFile >> bootSequence; + seqFile.close (); + } + else + bootSequence = 1; + + ofstream seqOut (seqFilename.c_str ()); + if (seqOut.good ()) + { + uint16_t nextSeq = (bootSequence + 1) & 0x7FFF; + if (nextSeq == 0) + nextSeq = 1; + seqOut << nextSeq << endl; + seqOut.close (); + } + + QPID_LOG (debug, "ManagementAgent boot sequence: " << bootSequence); } } @@ -125,16 +149,17 @@ void ManagementAgent::RegisterClass (string packageName, } void ManagementAgent::addObject (ManagementObject::shared_ptr object, - uint64_t /*persistenceId*/, - uint64_t /*idOffset*/) + uint32_t persistId, + uint32_t persistBank) { Mutex::ScopedLock lock (userLock); uint64_t objectId; -// if (persistenceId == 0) - objectId = nextObjectId++; -// else -// objectId = 0x8000000000000000ULL | (persistenceId + idOffset); + if (persistId == 0) + objectId = ((uint64_t) bootSequence) << 48 | + ((uint64_t) localBank) << 24 | nextObjectId++; + else + objectId = ((uint64_t) persistBank) << 24 | persistId; object->setObjectId (objectId); managementObjects[objectId] = object; @@ -384,7 +409,7 @@ void ManagementAgent::dispatchMethod (Message& msg, EncodeHeader (outBuffer, 'm', sequence); ManagementObjectMap::iterator iter = managementObjects.find (objId); - if (iter == managementObjects.end ()) + if (iter == managementObjects.end () || iter->second->isDeleted ()) { outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index bd86d4e773..4cd679a035 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -58,8 +58,8 @@ class ManagementAgent uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall); void addObject (ManagementObject::shared_ptr object, - uint64_t persistenceId = 0, - uint64_t idOffset = 10); + uint32_t persistId = 0, + uint32_t persistBank = 2); void clientAdded (void); void dispatchCommand (broker::Deliverable& msg, const std::string& routingKey, @@ -142,7 +142,9 @@ class ManagementAgent broker::Exchange::shared_ptr dExchange; std::string dataDir; uint16_t interval; - uint64_t nextObjectId; + uint16_t bootSequence; + uint32_t localBank; + uint32_t nextObjectId; uint32_t nextRemotePrefix; # define MA_BUFFER_SIZE 65536 diff --git a/qpid/python/commands/qpid-config b/qpid/python/commands/qpid-config index 3fd8e93c63..b93e74c4d2 100755 --- a/qpid/python/commands/qpid-config +++ b/qpid/python/commands/qpid-config @@ -97,13 +97,17 @@ class BrokerManager: try: self.spec = qpid.spec.load (_specpath) self.client = Client (self.broker.host, self.broker.port, self.spec) - self.client.start ({"LOGIN":"guest","PASSWORD":"guest"}) + self.client.start (response='\x00' + "guest" + '\x00' + "guest", + mechanism="PLAIN") self.channel = self.client.channel (1) self.mclient = managementClient (self.spec) self.mchannel = self.mclient.addChannel (self.channel) except socket.error, e: - print "Connect Error:", e - exit (1) + print "Socket Error:", e + sys.exit (1) + except Closed, e: + print "Connect Failed:", e + sys.exit (1) def Overview (self): self.ConnectToBroker () diff --git a/qpid/python/commands/qpid-route b/qpid/python/commands/qpid-route index 0db28c791b..c268c638c8 100755 --- a/qpid/python/commands/qpid-route +++ b/qpid/python/commands/qpid-route @@ -79,7 +79,8 @@ class RouteManager: try: self.spec = qpid.spec.load (_specpath) self.client = Client (broker.host, broker.port, self.spec) - self.client.start ({"LOGIN":"guest","PASSWORD":"guest"}) + self.client.start (response='\x00' + "guest" + '\x00' + "guest", + mechanism="PLAIN") self.channel = self.client.channel (1) self.mclient = managementClient (self.spec) self.mch = self.mclient.addChannel (self.channel) diff --git a/qpid/python/qpid/management.py b/qpid/python/qpid/management.py index d2dcacd62a..d32d458270 100644 --- a/qpid/python/qpid/management.py +++ b/qpid/python/qpid/management.py @@ -83,11 +83,11 @@ class methodResult: class managementChannel: """ This class represents a connection to an AMQP broker. """ - def __init__ (self, ch, topicCb, replyCb, cbContext): + def __init__ (self, ch, topicCb, replyCb, cbContext, _detlife=0): """ Given a channel on an established AMQP broker connection, this method opens a session and performs all of the declarations and bindings needed to participate in the management protocol. """ - response = ch.session_open (detached_lifetime=300) + response = ch.session_open (detached_lifetime=_detlife) self.sessionId = response.session_id self.topicName = "mgmt-%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack ("!LHHHHL", response.session_id) self.replyName = "repl-%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack ("!LHHHHL", response.session_id) @@ -97,8 +97,8 @@ class managementChannel: self.context = cbContext self.reqsOutstanding = 0 - ch.queue_declare (queue=self.topicName, exclusive=1, auto_delete=1) - ch.queue_declare (queue=self.replyName, exclusive=1, auto_delete=1) + ch.queue_declare (queue=self.topicName, exclusive=True, auto_delete=True) + ch.queue_declare (queue=self.replyName, exclusive=True, auto_delete=True) ch.queue_bind (exchange="qpid.management", queue=self.topicName, routing_key="mgmt.#") diff --git a/qpid/python/qpid/managementdata.py b/qpid/python/qpid/managementdata.py index ff43352247..2876bf0948 100644 --- a/qpid/python/qpid/managementdata.py +++ b/qpid/python/qpid/managementdata.py @@ -21,6 +21,7 @@ import qpid import socket +import struct from qpid.management import managementChannel, managementClient from threading import Lock from disp import Display @@ -63,19 +64,24 @@ class ManagementData: # def registerObjId (self, objId): - if self.baseId == 0: - if objId & 0x8000000000000000L == 0: - self.baseId = objId - 1000 + boot = objId & 0x7FFF000000000000L + if boot == 0: + return + self.bootSequence = boot def displayObjId (self, objId): - if objId & 0x8000000000000000L == 0: - return objId - self.baseId - return (objId & 0x7fffffffffffffffL) + 5000 + bank = (objId & 0x0000FFFFFF000000L) >> 24 + id = objId & 0x0000000000FFFFFFL + return bank * 1000 + id def rawObjId (self, displayId): - if displayId < 5000: - return displayId + self.baseId - return displayId - 5000 + 0x8000000000000000L + bank = displayId / 1000 + id = displayId % 1000 + if bank < 3: + objId = (bank << 24) + id + else: + objId = self.bootSequence + (bank << 24) + id + return objId def displayClassName (self, cls): (packageName, className, hash) = cls @@ -158,7 +164,7 @@ class ManagementData: self.lock = Lock () self.tables = {} self.schema = {} - self.baseId = 0 + self.bootSequence = 0 self.disp = disp self.lastUnit = None self.methodSeq = 1 @@ -166,7 +172,8 @@ class ManagementData: self.broker = Broker (host) self.client = Client (self.broker.host, self.broker.port, self.spec) - self.client.start ({"LOGIN": username, "PASSWORD": password}) + self.client.start (response='\x00' + username + '\x00' + password, + mechanism="PLAIN") self.channel = self.client.channel (1) self.mclient = managementClient (self.spec, self.ctrlHandler, self.configHandler, @@ -189,7 +196,7 @@ class ManagementData: if item[0] == key: typecode = item[1] unit = item[2] - if (typecode >= 1 and typecode <= 5) or typecode >= 12: # numerics + if (typecode >= 1 and typecode <= 5) or typecode == 12 or typecode == 13: # numerics if unit == None or unit == self.lastUnit: return str (value) else: @@ -214,7 +221,7 @@ class ManagementData: else: return "True" elif typecode == 14: - return str (UUID (bytes=value)) + return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack ("!LHHHHL", value) elif typecode == 15: return str (value) return "*type-error*" diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index e37921a8f5..5e9d64597d 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -45,7 +45,7 @@ =============================================================== --> <class name="system"> - <configElement name="sysId" index="y" type="sstr" access="RC"/> + <configElement name="sysId" index="y" type="uuid" access="RC"/> <configElement name="osName" type="sstr" access="RO" desc="Operating System Name"/> <configElement name="nodeName" type="sstr" access="RO" desc="Node Name"/> |
