summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorNuno Santos <nsantos@apache.org>2008-04-15 16:12:01 +0000
committerNuno Santos <nsantos@apache.org>2008-04-15 16:12:01 +0000
commit0cb55b441ec82124319fd3b154261a07ded82df2 (patch)
tree23bff88d64af5486b8c4c58f28f0b9fd6358f3ac /qpid
parenta106490812ea883620e84f50b4ce001d375e81f7 (diff)
downloadqpid-python-0cb55b441ec82124319fd3b154261a07ded82df2.tar.gz
QPID-921: applied qpid-patch36.diff on behalf of Ted Ross
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@648308 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/System.cpp40
-rw-r--r--qpid/cpp/src/qpid/broker/System.h3
-rw-r--r--qpid/cpp/src/qpid/broker/Vhost.cpp2
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp45
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h8
-rwxr-xr-xqpid/python/commands/qpid-config10
-rwxr-xr-xqpid/python/commands/qpid-route3
-rw-r--r--qpid/python/qpid/management.py8
-rw-r--r--qpid/python/qpid/managementdata.py33
-rw-r--r--qpid/specs/management-schema.xml2
12 files changed, 116 insertions, 44 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index b9268db9e5..689fb0687c 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -136,7 +136,7 @@ Broker::Broker(const Broker::Options& conf) :
managementAgent->setInterval (conf.mgmtPubInterval);
qpid::management::PackageQpid packageInitializer (managementAgent);
- System* system = new System ();
+ System* system = new System (dataDir.isEnabled () ? dataDir.getPath () : string ());
systemObject = System::shared_ptr (system);
mgmtObject = management::Broker::shared_ptr (new management::Broker (this, system, conf.port));
@@ -149,7 +149,7 @@ Broker::Broker(const Broker::Options& conf) :
mgmtObject->set_dataDirEnabled (dataDir.isEnabled ());
mgmtObject->set_dataDir (dataDir.getPath ());
- managementAgent->addObject (mgmtObject, 1, 0);
+ managementAgent->addObject (mgmtObject, 1, 1);
// Since there is currently no support for virtual hosts, a placeholder object
// representing the implied single virtual host is added here to keep the
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 591e9796d6..b3d8fda53b 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -69,7 +69,7 @@ Queue::Queue(const string& _name, bool _autodelete,
if (agent.get () != 0)
{
mgmtObject = management::Queue::shared_ptr
- (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0));
+ (new management::Queue (this, parent, _name, _store != 0, _autodelete, _owner != 0));
// Add the object to the management agent only if this queue is not durable.
// If it's durable, we will add it later when the queue is assigned a persistenceId.
diff --git a/qpid/cpp/src/qpid/broker/System.cpp b/qpid/cpp/src/qpid/broker/System.cpp
index 87d5185b97..33a9db46a2 100644
--- a/qpid/cpp/src/qpid/broker/System.cpp
+++ b/qpid/cpp/src/qpid/broker/System.cpp
@@ -19,19 +19,51 @@
#include "System.h"
#include "qpid/management/ManagementAgent.h"
+#include "qpid/framing/Uuid.h"
#include <sys/utsname.h>
+#include <iostream>
+#include <fstream>
-using namespace qpid::broker;
using qpid::management::ManagementAgent;
+using namespace qpid::broker;
+using namespace std;
-System::System ()
+System::System (string _dataDir)
{
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
if (agent.get () != 0)
{
+ framing::Uuid systemId;
+
+ if (_dataDir.empty ())
+ {
+ systemId.generate ();
+ }
+ else
+ {
+ string filename (_dataDir + "/systemId");
+ ifstream inFile (filename.c_str ());
+
+ if (inFile.good ())
+ {
+ inFile >> systemId;
+ inFile.close ();
+ }
+ else
+ {
+ systemId.generate ();
+ ofstream outFile (filename.c_str ());
+ if (outFile.good ())
+ {
+ outFile << systemId << endl;
+ outFile.close ();
+ }
+ }
+ }
+
mgmtObject = management::System::shared_ptr
- (new management::System (this, "host"));
+ (new management::System (this, systemId));
struct utsname _uname;
if (uname (&_uname) == 0)
{
@@ -42,7 +74,7 @@ System::System ()
mgmtObject->set_machine (std::string (_uname.machine));
}
- agent->addObject (mgmtObject, 3, 0);
+ agent->addObject (mgmtObject, 3, 1);
}
}
diff --git a/qpid/cpp/src/qpid/broker/System.h b/qpid/cpp/src/qpid/broker/System.h
index a1a710f2b2..0d63bd1b3d 100644
--- a/qpid/cpp/src/qpid/broker/System.h
+++ b/qpid/cpp/src/qpid/broker/System.h
@@ -23,6 +23,7 @@
#include "qpid/management/Manageable.h"
#include "qpid/management/System.h"
#include <boost/shared_ptr.hpp>
+#include <string>
namespace qpid {
namespace broker {
@@ -37,7 +38,7 @@ class System : public management::Manageable
typedef boost::shared_ptr<System> shared_ptr;
- System ();
+ System (std::string _dataDir);
management::ManagementObject::shared_ptr GetManagementObject (void) const
{ return mgmtObject; }
diff --git a/qpid/cpp/src/qpid/broker/Vhost.cpp b/qpid/cpp/src/qpid/broker/Vhost.cpp
index 06a8c8eca3..c2c7e985b4 100644
--- a/qpid/cpp/src/qpid/broker/Vhost.cpp
+++ b/qpid/cpp/src/qpid/broker/Vhost.cpp
@@ -33,7 +33,7 @@ Vhost::Vhost (management::Manageable* parentBroker)
{
mgmtObject = management::Vhost::shared_ptr
(new management::Vhost (this, parentBroker, "/"));
- agent->addObject (mgmtObject, 2, 0);
+ agent->addObject (mgmtObject, 2, 1);
}
}
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index ee0eb27bf6..cace04bef5 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -42,20 +42,24 @@ ManagementAgent::ManagementAgent (string _dataDir, uint16_t _interval) :
dataDir (_dataDir), interval (_interval)
{
timer.add (intrusive_ptr<TimerTask> (new Periodic(*this, interval)));
- nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
+ localBank = 3;
+ nextObjectId = 1;
nextRemotePrefix = 101;
// Get from file or generate and save to file.
if (dataDir.empty ())
{
uuid.generate ();
+ bootSequence = 1;
QPID_LOG (info, "ManagementAgent has no data directory, generated new broker ID: "
<< uuid);
}
else
{
- string filename (dataDir + "/brokerId");
- ifstream inFile (filename.c_str ());
+ string filename (dataDir + "/brokerId");
+ string seqFilename (dataDir + "/bootseq");
+ ifstream inFile (filename.c_str ());
+ ifstream seqFile (seqFilename.c_str ());
if (inFile.good ())
{
@@ -80,6 +84,26 @@ ManagementAgent::ManagementAgent (string _dataDir, uint16_t _interval) :
QPID_LOG (warning, "ManagementAgent unable to save broker ID");
}
}
+
+ if (seqFile.good ())
+ {
+ seqFile >> bootSequence;
+ seqFile.close ();
+ }
+ else
+ bootSequence = 1;
+
+ ofstream seqOut (seqFilename.c_str ());
+ if (seqOut.good ())
+ {
+ uint16_t nextSeq = (bootSequence + 1) & 0x7FFF;
+ if (nextSeq == 0)
+ nextSeq = 1;
+ seqOut << nextSeq << endl;
+ seqOut.close ();
+ }
+
+ QPID_LOG (debug, "ManagementAgent boot sequence: " << bootSequence);
}
}
@@ -125,16 +149,17 @@ void ManagementAgent::RegisterClass (string packageName,
}
void ManagementAgent::addObject (ManagementObject::shared_ptr object,
- uint64_t /*persistenceId*/,
- uint64_t /*idOffset*/)
+ uint32_t persistId,
+ uint32_t persistBank)
{
Mutex::ScopedLock lock (userLock);
uint64_t objectId;
-// if (persistenceId == 0)
- objectId = nextObjectId++;
-// else
-// objectId = 0x8000000000000000ULL | (persistenceId + idOffset);
+ if (persistId == 0)
+ objectId = ((uint64_t) bootSequence) << 48 |
+ ((uint64_t) localBank) << 24 | nextObjectId++;
+ else
+ objectId = ((uint64_t) persistBank) << 24 | persistId;
object->setObjectId (objectId);
managementObjects[objectId] = object;
@@ -384,7 +409,7 @@ void ManagementAgent::dispatchMethod (Message& msg,
EncodeHeader (outBuffer, 'm', sequence);
ManagementObjectMap::iterator iter = managementObjects.find (objId);
- if (iter == managementObjects.end ())
+ if (iter == managementObjects.end () || iter->second->isDeleted ())
{
outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
outBuffer.putShortString (Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index bd86d4e773..4cd679a035 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -58,8 +58,8 @@ class ManagementAgent
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
void addObject (ManagementObject::shared_ptr object,
- uint64_t persistenceId = 0,
- uint64_t idOffset = 10);
+ uint32_t persistId = 0,
+ uint32_t persistBank = 2);
void clientAdded (void);
void dispatchCommand (broker::Deliverable& msg,
const std::string& routingKey,
@@ -142,7 +142,9 @@ class ManagementAgent
broker::Exchange::shared_ptr dExchange;
std::string dataDir;
uint16_t interval;
- uint64_t nextObjectId;
+ uint16_t bootSequence;
+ uint32_t localBank;
+ uint32_t nextObjectId;
uint32_t nextRemotePrefix;
# define MA_BUFFER_SIZE 65536
diff --git a/qpid/python/commands/qpid-config b/qpid/python/commands/qpid-config
index 3fd8e93c63..b93e74c4d2 100755
--- a/qpid/python/commands/qpid-config
+++ b/qpid/python/commands/qpid-config
@@ -97,13 +97,17 @@ class BrokerManager:
try:
self.spec = qpid.spec.load (_specpath)
self.client = Client (self.broker.host, self.broker.port, self.spec)
- self.client.start ({"LOGIN":"guest","PASSWORD":"guest"})
+ self.client.start (response='\x00' + "guest" + '\x00' + "guest",
+ mechanism="PLAIN")
self.channel = self.client.channel (1)
self.mclient = managementClient (self.spec)
self.mchannel = self.mclient.addChannel (self.channel)
except socket.error, e:
- print "Connect Error:", e
- exit (1)
+ print "Socket Error:", e
+ sys.exit (1)
+ except Closed, e:
+ print "Connect Failed:", e
+ sys.exit (1)
def Overview (self):
self.ConnectToBroker ()
diff --git a/qpid/python/commands/qpid-route b/qpid/python/commands/qpid-route
index 0db28c791b..c268c638c8 100755
--- a/qpid/python/commands/qpid-route
+++ b/qpid/python/commands/qpid-route
@@ -79,7 +79,8 @@ class RouteManager:
try:
self.spec = qpid.spec.load (_specpath)
self.client = Client (broker.host, broker.port, self.spec)
- self.client.start ({"LOGIN":"guest","PASSWORD":"guest"})
+ self.client.start (response='\x00' + "guest" + '\x00' + "guest",
+ mechanism="PLAIN")
self.channel = self.client.channel (1)
self.mclient = managementClient (self.spec)
self.mch = self.mclient.addChannel (self.channel)
diff --git a/qpid/python/qpid/management.py b/qpid/python/qpid/management.py
index d2dcacd62a..d32d458270 100644
--- a/qpid/python/qpid/management.py
+++ b/qpid/python/qpid/management.py
@@ -83,11 +83,11 @@ class methodResult:
class managementChannel:
""" This class represents a connection to an AMQP broker. """
- def __init__ (self, ch, topicCb, replyCb, cbContext):
+ def __init__ (self, ch, topicCb, replyCb, cbContext, _detlife=0):
""" Given a channel on an established AMQP broker connection, this method
opens a session and performs all of the declarations and bindings needed
to participate in the management protocol. """
- response = ch.session_open (detached_lifetime=300)
+ response = ch.session_open (detached_lifetime=_detlife)
self.sessionId = response.session_id
self.topicName = "mgmt-%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack ("!LHHHHL", response.session_id)
self.replyName = "repl-%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack ("!LHHHHL", response.session_id)
@@ -97,8 +97,8 @@ class managementChannel:
self.context = cbContext
self.reqsOutstanding = 0
- ch.queue_declare (queue=self.topicName, exclusive=1, auto_delete=1)
- ch.queue_declare (queue=self.replyName, exclusive=1, auto_delete=1)
+ ch.queue_declare (queue=self.topicName, exclusive=True, auto_delete=True)
+ ch.queue_declare (queue=self.replyName, exclusive=True, auto_delete=True)
ch.queue_bind (exchange="qpid.management",
queue=self.topicName, routing_key="mgmt.#")
diff --git a/qpid/python/qpid/managementdata.py b/qpid/python/qpid/managementdata.py
index ff43352247..2876bf0948 100644
--- a/qpid/python/qpid/managementdata.py
+++ b/qpid/python/qpid/managementdata.py
@@ -21,6 +21,7 @@
import qpid
import socket
+import struct
from qpid.management import managementChannel, managementClient
from threading import Lock
from disp import Display
@@ -63,19 +64,24 @@ class ManagementData:
#
def registerObjId (self, objId):
- if self.baseId == 0:
- if objId & 0x8000000000000000L == 0:
- self.baseId = objId - 1000
+ boot = objId & 0x7FFF000000000000L
+ if boot == 0:
+ return
+ self.bootSequence = boot
def displayObjId (self, objId):
- if objId & 0x8000000000000000L == 0:
- return objId - self.baseId
- return (objId & 0x7fffffffffffffffL) + 5000
+ bank = (objId & 0x0000FFFFFF000000L) >> 24
+ id = objId & 0x0000000000FFFFFFL
+ return bank * 1000 + id
def rawObjId (self, displayId):
- if displayId < 5000:
- return displayId + self.baseId
- return displayId - 5000 + 0x8000000000000000L
+ bank = displayId / 1000
+ id = displayId % 1000
+ if bank < 3:
+ objId = (bank << 24) + id
+ else:
+ objId = self.bootSequence + (bank << 24) + id
+ return objId
def displayClassName (self, cls):
(packageName, className, hash) = cls
@@ -158,7 +164,7 @@ class ManagementData:
self.lock = Lock ()
self.tables = {}
self.schema = {}
- self.baseId = 0
+ self.bootSequence = 0
self.disp = disp
self.lastUnit = None
self.methodSeq = 1
@@ -166,7 +172,8 @@ class ManagementData:
self.broker = Broker (host)
self.client = Client (self.broker.host, self.broker.port, self.spec)
- self.client.start ({"LOGIN": username, "PASSWORD": password})
+ self.client.start (response='\x00' + username + '\x00' + password,
+ mechanism="PLAIN")
self.channel = self.client.channel (1)
self.mclient = managementClient (self.spec, self.ctrlHandler, self.configHandler,
@@ -189,7 +196,7 @@ class ManagementData:
if item[0] == key:
typecode = item[1]
unit = item[2]
- if (typecode >= 1 and typecode <= 5) or typecode >= 12: # numerics
+ if (typecode >= 1 and typecode <= 5) or typecode == 12 or typecode == 13: # numerics
if unit == None or unit == self.lastUnit:
return str (value)
else:
@@ -214,7 +221,7 @@ class ManagementData:
else:
return "True"
elif typecode == 14:
- return str (UUID (bytes=value))
+ return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack ("!LHHHHL", value)
elif typecode == 15:
return str (value)
return "*type-error*"
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index e37921a8f5..5e9d64597d 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -45,7 +45,7 @@
===============================================================
-->
<class name="system">
- <configElement name="sysId" index="y" type="sstr" access="RC"/>
+ <configElement name="sysId" index="y" type="uuid" access="RC"/>
<configElement name="osName" type="sstr" access="RO" desc="Operating System Name"/>
<configElement name="nodeName" type="sstr" access="RO" desc="Node Name"/>