summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/posix/QpiddBroker.cpp4
-rw-r--r--qpid/cpp/src/qpid/acl/Acl.cpp8
-rw-r--r--qpid/cpp/src/qpid/acl/Acl.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h6
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.cpp34
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.h10
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp37
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h6
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.h10
-rw-r--r--qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp9
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h4
-rw-r--r--qpid/cpp/src/qpid/broker/System.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/System.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Vhost.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/Vhost.h4
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h4
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/StatusCheck.cpp4
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp805
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h123
-rw-r--r--qpid/cpp/src/tests/BrokerMgmtAgent.cpp1161
-rw-r--r--qpid/cpp/src/tests/testagent.cpp16
35 files changed, 1118 insertions, 1220 deletions
diff --git a/qpid/cpp/src/posix/QpiddBroker.cpp b/qpid/cpp/src/posix/QpiddBroker.cpp
index f1f9009568..a681a6d18d 100644
--- a/qpid/cpp/src/posix/QpiddBroker.cpp
+++ b/qpid/cpp/src/posix/QpiddBroker.cpp
@@ -144,7 +144,7 @@ struct QpiddDaemon : public Daemon {
uint16_t port=brokerPtr->getPort(options->daemon.transport);
ready(port); // Notify parent.
if (options->parent->broker.enableMgmt && (options->parent->broker.port == 0 || options->daemon.transport != TCP)) {
- dynamic_cast<qmf::org::apache::qpid::broker::Broker*>(brokerPtr->GetManagementObject())->set_port(port);
+ boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObject())->set_port(port);
}
brokerPtr->run();
}
@@ -200,7 +200,7 @@ int QpiddBroker::execute (QpiddOptions *options) {
uint16_t port = brokerPtr->getPort(myOptions->daemon.transport);
cout << port << endl;
if (options->broker.enableMgmt) {
- dynamic_cast<qmf::org::apache::qpid::broker::Broker*>(brokerPtr->GetManagementObject())->set_port(port);
+ boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObject())->set_port(port);
}
}
brokerPtr->run();
diff --git a/qpid/cpp/src/qpid/acl/Acl.cpp b/qpid/cpp/src/qpid/acl/Acl.cpp
index cd79add720..61e0b56104 100644
--- a/qpid/cpp/src/qpid/acl/Acl.cpp
+++ b/qpid/cpp/src/qpid/acl/Acl.cpp
@@ -52,7 +52,7 @@ using qpid::management::Manageable;
using qpid::management::Args;
namespace _qmf = qmf::org::apache::qpid::acl;
-Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(false), mgmtObject(0),
+Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(false),
connectionCounter(new ConnectionCounter(*this, aclValues.aclMaxConnectPerUser, aclValues.aclMaxConnectPerIp, aclValues.aclMaxConnectTotal)),
resourceCounter(new ResourceCounter(*this, aclValues.aclMaxQueuesPerUser)){
@@ -60,7 +60,7 @@ Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(fals
if (agent != 0){
_qmf::Package packageInit(agent);
- mgmtObject = new _qmf::Acl (agent, this, broker);
+ mgmtObject = _qmf::Acl::shared_ptr(new _qmf::Acl (agent, this, broker));
agent->addObject (mgmtObject);
mgmtObject->set_maxConnections(aclValues.aclMaxConnectTotal);
mgmtObject->set_maxConnectionsPerIp(aclValues.aclMaxConnectPerIp);
@@ -317,9 +317,9 @@ Acl::~Acl(){
broker->getConnectionObservers().remove(connectionCounter);
}
-ManagementObject* Acl::GetManagementObject(void) const
+ManagementObject::shared_ptr Acl::GetManagementObject(void) const
{
- return (ManagementObject*) mgmtObject;
+ return mgmtObject;
}
Manageable::status_t Acl::ManagementMethod (uint32_t methodId, Args& args, string& text)
diff --git a/qpid/cpp/src/qpid/acl/Acl.h b/qpid/cpp/src/qpid/acl/Acl.h
index a23952ff53..28cbfb8f3f 100644
--- a/qpid/cpp/src/qpid/acl/Acl.h
+++ b/qpid/cpp/src/qpid/acl/Acl.h
@@ -62,7 +62,7 @@ private:
broker::Broker* broker;
bool transferAcl;
boost::shared_ptr<AclData> data;
- qmf::org::apache::qpid::acl::Acl* mgmtObject; // mgnt owns lifecycle
+ qmf::org::apache::qpid::acl::Acl::shared_ptr mgmtObject;
qpid::management::ManagementAgent* agent;
mutable qpid::sys::Mutex dataLock;
boost::shared_ptr<ConnectionCounter> connectionCounter;
@@ -113,7 +113,7 @@ private:
bool readAclFile(std::string& aclFile, std::string& errorText);
Manageable::status_t lookup (management::Args& args, std::string& text);
Manageable::status_t lookupPublish(management::Args& args, std::string& text);
- virtual qpid::management::ManagementObject* GetManagementObject(void) const;
+ virtual qpid::management::ManagementObject::shared_ptr GetManagementObject(void) const;
virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
};
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index d1706b5907..dfc99bb834 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -60,7 +60,7 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame)
Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id,
CancellationListener l, const _qmf::ArgsLinkBridge& _args,
InitializeCallback init, const std::string& _queueName, const string& ae) :
- link(_link), channel(_id), args(_args), mgmtObject(0),
+ link(_link), channel(_id), args(_args),
listener(l), name(_name),
queueName(_queueName.empty() ? "qpid.bridge_queue_" + name + "_" + link->getBroker()->getFederationTag()
: _queueName),
@@ -71,10 +71,10 @@ Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id,
{
ManagementAgent* agent = link->getBroker()->getManagementAgent();
if (agent != 0) {
- mgmtObject = new _qmf::Bridge
+ mgmtObject = _qmf::Bridge::shared_ptr(new _qmf::Bridge
(agent, this, link, name, args.i_durable, args.i_src, args.i_dest,
args.i_key, args.i_srcIsQueue, args.i_srcIsLocal,
- args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
+ args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync));
mgmtObject->set_channelId(channel);
agent->addObject(mgmtObject);
}
@@ -296,9 +296,9 @@ uint32_t Bridge::encodedSize() const
+ 2; // sync
}
-management::ManagementObject* Bridge::GetManagementObject (void) const
+management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const
{
- return (management::ManagementObject*) mgmtObject;
+ return mgmtObject;
}
management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId,
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index ee298afd45..2b4d019ccd 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -72,7 +72,7 @@ class Bridge : public PersistableConfig,
bool isDetached() const { return detached; }
- management::ManagementObject* GetManagementObject() const;
+ management::ManagementObject::shared_ptr GetManagementObject() const;
management::Manageable::status_t ManagementMethod(uint32_t methodId,
management::Args& args,
std::string& text);
@@ -128,7 +128,7 @@ class Bridge : public PersistableConfig,
Link* const link;
const framing::ChannelId channel;
qmf::org::apache::qpid::broker::ArgsLinkBridge args;
- qmf::org::apache::qpid::broker::Bridge* mgmtObject;
+ qmf::org::apache::qpid::broker::Bridge::shared_ptr mgmtObject;
CancellationListener listener;
std::string name;
std::string queueName;
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 32b0fdd46e..0c466aea07 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -214,7 +214,6 @@ Broker::Broker(const Broker::Options& conf) :
conf.replayFlushLimit*1024, // convert kb to bytes.
conf.replayHardLimit*1024),
*this),
- mgmtObject(0),
queueCleaner(queues, &timer),
recoveryInProgress(false),
recovery(true),
@@ -235,7 +234,7 @@ Broker::Broker(const Broker::Options& conf) :
System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this);
systemObject = System::shared_ptr(system);
- mgmtObject = new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker");
+ mgmtObject = _qmf::Broker::shared_ptr(new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker"));
mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId());
mgmtObject->set_port(conf.port);
mgmtObject->set_workerThreads(conf.workerThreads);
@@ -450,9 +449,9 @@ Broker::~Broker() {
QPID_LOG(notice, "Shut down");
}
-ManagementObject* Broker::GetManagementObject(void) const
+ManagementObject::shared_ptr Broker::GetManagementObject(void) const
{
- return (ManagementObject*) mgmtObject;
+ return mgmtObject;
}
Manageable* Broker::GetVhostObject(void) const
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 9b90330cb9..11757657b8 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -171,7 +171,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
boost::shared_ptr<sys::ConnectionCodec::Factory> factory;
DtxManager dtxManager;
SessionManager sessionManager;
- qmf::org::apache::qpid::broker::Broker* mgmtObject;
+ qmf::org::apache::qpid::broker::Broker::shared_ptr mgmtObject;
Vhost::shared_ptr vhostObject;
System::shared_ptr systemObject;
QueueCleaner queueCleaner;
@@ -230,7 +230,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
SessionManager& getSessionManager() { return sessionManager; }
const std::string& getFederationTag() const { return federationTag; }
- QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject() const;
+ QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject() const;
QPID_BROKER_EXTERN management::Manageable* GetVhostObject() const;
QPID_BROKER_EXTERN management::Manageable::status_t ManagementMethod(
uint32_t methodId, management::Args& args, std::string& text);
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index 08a9c756d0..043325a4fa 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -97,7 +97,6 @@ Connection::Connection(ConnectionOutputHandler* out_,
link(link_),
mgmtClosing(false),
mgmtId(mgmtId_),
- mgmtObject(0),
links(broker_.getLinks()),
agent(0),
timer(broker_.getTimer()),
@@ -119,7 +118,7 @@ void Connection::addManagementObject() {
agent = broker.getManagementAgent();
if (agent != 0) {
// TODO set last bool true if system connection
- mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !link, false);
+ mgmtObject = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, mgmtId, !link, false));
mgmtObject->set_shadow(shadow);
agent->addObject(mgmtObject, objectId);
}
@@ -403,9 +402,9 @@ SessionHandler& Connection::getChannel(ChannelId id) {
return *ptr_map_ptr(i);
}
-ManagementObject* Connection::GetManagementObject(void) const
+ManagementObject::shared_ptr Connection::GetManagementObject(void) const
{
- return (ManagementObject*) mgmtObject;
+ return mgmtObject;
}
Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&, string&)
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index d01599ce54..3ef9877750 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -112,7 +112,7 @@ class Connection : public sys::ConnectionInputHandler,
void closeChannel(framing::ChannelId channel);
// Manageable entry points
- management::ManagementObject* GetManagementObject (void) const;
+ management::ManagementObject::shared_ptr GetManagementObject (void) const;
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
@@ -196,7 +196,7 @@ class Connection : public sys::ConnectionInputHandler,
const std::string mgmtId;
sys::Mutex ioCallbackLock;
std::queue<boost::function0<void> > ioCallbacks;
- qmf::org::apache::qpid::broker::Connection* mgmtObject;
+ qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject;
LinkRegistry& links;
management::ManagementAgent* agent;
sys::Timer& timer;
@@ -231,7 +231,7 @@ class Connection : public sys::ConnectionInputHandler,
public:
- qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; }
+ qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; }
};
}}
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index a484cc054e..d1dd1fae6c 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -150,7 +150,7 @@ void ConnectionHandler::Handler::startOk(const framing::FieldTable& /*clientProp
void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body)
{
const framing::FieldTable& clientProperties = body.getClientProperties();
- qmf::org::apache::qpid::broker::Connection* mgmtObject = connection.getMgmtObject();
+ qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject = connection.getMgmtObject();
if (mgmtObject != 0) {
string procName = clientProperties.getAsString(CLIENT_PROCESS_NAME);
diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp
index bb5dc2b807..12360df81d 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.cpp
+++ b/qpid/cpp/src/qpid/broker/Exchange.cpp
@@ -165,19 +165,19 @@ void Exchange::routeIVE(){
Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
name(_name), durable(false), alternateUsers(0), persistenceId(0), sequence(false),
- sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false)
+ sequenceNo(0), ive(false), broker(b), destroyed(false)
{
if (parent != 0 && broker != 0)
{
ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0)
{
- mgmtExchange = new _qmf::Exchange (agent, this, parent, _name);
+ mgmtExchange = _qmf::Exchange::shared_ptr(new _qmf::Exchange (agent, this, parent, _name));
mgmtExchange->set_durable(durable);
mgmtExchange->set_autoDelete(false);
agent->addObject(mgmtExchange, 0, durable);
if (broker)
- brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
+ brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject());
}
}
}
@@ -185,20 +185,20 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
Manageable* parent, Broker* b)
: name(_name), durable(_durable), alternateUsers(0), persistenceId(0),
- args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false)
+ args(_args), sequence(false), sequenceNo(0), ive(false), broker(b), destroyed(false)
{
if (parent != 0 && broker != 0)
{
ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0)
{
- mgmtExchange = new _qmf::Exchange (agent, this, parent, _name);
+ mgmtExchange = _qmf::Exchange::shared_ptr(new _qmf::Exchange (agent, this, parent, _name));
mgmtExchange->set_durable(durable);
mgmtExchange->set_autoDelete(false);
mgmtExchange->set_arguments(ManagementAgent::toMap(args));
agent->addObject(mgmtExchange, 0, durable);
if (broker)
- brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
+ brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject());
}
}
@@ -294,9 +294,9 @@ void Exchange::recoveryComplete(ExchangeRegistry& exchanges)
}
}
-ManagementObject* Exchange::GetManagementObject (void) const
+ManagementObject::shared_ptr Exchange::GetManagementObject (void) const
{
- return (ManagementObject*) mgmtExchange;
+ return mgmtExchange;
}
void Exchange::registerDynamicBridge(DynamicBridge* db)
@@ -345,16 +345,16 @@ void Exchange::propagateFedOp(const string& routingKey, const string& tags, cons
Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* _parent,
FieldTable _args, const string& _origin)
- : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), mgmtBinding(0)
+ : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin)
{
}
Exchange::Binding::~Binding ()
{
if (mgmtBinding != 0) {
- ManagementObject* mo = queue->GetManagementObject();
+ _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject());
if (mo != 0)
- static_cast<_qmf::Queue*>(mo)->dec_bindingCount();
+ mo->dec_bindingCount();
mgmtBinding->resourceDestroy ();
}
}
@@ -367,25 +367,25 @@ void Exchange::Binding::startManagement()
if (broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
if (agent != 0) {
- ManagementObject* mo = queue->GetManagementObject();
+ _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject());
if (mo != 0) {
management::ObjectId queueId = mo->getObjectId();
- mgmtBinding = new _qmf::Binding
- (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args));
+ mgmtBinding = _qmf::Binding::shared_ptr(new _qmf::Binding
+ (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args)));
if (!origin.empty())
mgmtBinding->set_origin(origin);
agent->addObject(mgmtBinding);
- static_cast<_qmf::Queue*>(mo)->inc_bindingCount();
+ mo->inc_bindingCount();
}
}
}
}
}
-ManagementObject* Exchange::Binding::GetManagementObject () const
+ManagementObject::shared_ptr Exchange::Binding::GetManagementObject () const
{
- return (ManagementObject*) mgmtBinding;
+ return mgmtBinding;
}
Exchange::MatchQueue::MatchQueue(Queue::shared_ptr q) : queue(q) {}
diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h
index 26ee5b0054..517b551a83 100644
--- a/qpid/cpp/src/qpid/broker/Exchange.h
+++ b/qpid/cpp/src/qpid/broker/Exchange.h
@@ -52,13 +52,13 @@ public:
const std::string key;
const framing::FieldTable args;
std::string origin;
- qmf::org::apache::qpid::broker::Binding* mgmtBinding;
+ qmf::org::apache::qpid::broker::Binding::shared_ptr mgmtBinding;
Binding(const std::string& key, boost::shared_ptr<Queue> queue, Exchange* parent = 0,
framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string());
~Binding();
void startManagement();
- management::ManagementObject* GetManagementObject() const;
+ management::ManagementObject::shared_ptr GetManagementObject() const;
};
private:
@@ -159,8 +159,8 @@ protected:
}
};
- qmf::org::apache::qpid::broker::Exchange* mgmtExchange;
- qmf::org::apache::qpid::broker::Broker* brokerMgmtObject;
+ qmf::org::apache::qpid::broker::Exchange::shared_ptr mgmtExchange;
+ qmf::org::apache::qpid::broker::Broker::shared_ptr brokerMgmtObject;
public:
typedef boost::shared_ptr<Exchange> shared_ptr;
@@ -210,7 +210,7 @@ public:
static QPID_BROKER_EXTERN Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer);
// Manageable entry points
- QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject(void) const;
+ QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject(void) const;
// Federation hooks
class DynamicBridge {
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index afa5623ecd..b014217180 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -143,7 +143,7 @@ Link::Link(const string& _name,
host(_host), port(_port), transport(_transport),
durable(_durable),
authMechanism(_authMechanism), username(_username), password(_password),
- persistenceId(0), mgmtObject(0), broker(_broker), state(0),
+ persistenceId(0), broker(_broker), state(0),
visitCount(0),
currentInterval(1),
closing(false),
@@ -161,7 +161,7 @@ Link::Link(const string& _name,
agent = broker->getManagementAgent();
if (agent != 0)
{
- mgmtObject = new _qmf::Link(agent, this, parent, name, durable);
+ mgmtObject = _qmf::Link::shared_ptr(new _qmf::Link(agent, this, parent, name, durable));
mgmtObject->set_host(host);
mgmtObject->set_port(port);
mgmtObject->set_transport(transport);
@@ -638,9 +638,9 @@ uint32_t Link::encodedSize() const
+ password.size() + 1;
}
-ManagementObject* Link::GetManagementObject (void) const
+ManagementObject::shared_ptr Link::GetManagementObject (void) const
{
- return (ManagementObject*) mgmtObject;
+ return mgmtObject;
}
void Link::close() {
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index f0cb90e73b..49ee3ef36a 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -69,7 +69,7 @@ class Link : public PersistableConfig, public management::Manageable {
std::string username;
std::string password;
mutable uint64_t persistenceId;
- qmf::org::apache::qpid::broker::Link* mgmtObject;
+ qmf::org::apache::qpid::broker::Link::shared_ptr mgmtObject;
Broker* broker;
int state;
uint32_t visitCount;
@@ -181,7 +181,7 @@ class Link : public PersistableConfig, public management::Manageable {
static bool isEncodedLink(const std::string& key);
// Manageable entry points
- management::ManagementObject* GetManagementObject(void) const;
+ management::ManagementObject::shared_ptr GetManagementObject(void) const;
management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&);
// manage the exchange owned by this link
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 9cf2f541ce..9dc9ec7a6d 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -77,8 +77,8 @@ namespace
{
inline void mgntEnqStats(const Message& msg,
- _qmf::Queue* mgmtObject,
- _qmf::Broker* brokerMgmtObject)
+ _qmf::Queue::shared_ptr mgmtObject,
+ _qmf::Broker::shared_ptr brokerMgmtObject)
{
if (mgmtObject != 0) {
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
@@ -101,8 +101,8 @@ inline void mgntEnqStats(const Message& msg,
}
inline void mgntDeqStats(const Message& msg,
- _qmf::Queue* mgmtObject,
- _qmf::Broker* brokerMgmtObject)
+ _qmf::Queue::shared_ptr mgmtObject,
+ _qmf::Broker::shared_ptr brokerMgmtObject)
{
if (mgmtObject != 0){
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
@@ -179,8 +179,6 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
messages(new MessageDeque()),
persistenceId(0),
settings(b ? merge(_settings, b->getOptions()) : _settings),
- mgmtObject(0),
- brokerMgmtObject(0),
eventMode(0),
broker(b),
deleted(false),
@@ -195,17 +193,17 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
qpid::amqp_0_10::translate(settings.asMap(), encodableSettings);
if (parent != 0 && broker != 0) {
ManagementAgent* agent = broker->getManagementAgent();
-
if (agent != 0) {
- mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete);
+ mgmtObject = _qmf::Queue::shared_ptr(
+ new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete));
mgmtObject->set_arguments(settings.asMap());
agent->addObject(mgmtObject, 0, store != 0);
- brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
+ brokerMgmtObject = boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject());
if (brokerMgmtObject)
brokerMgmtObject->inc_queueCount();
}
}
-
+
if ( settings.isBrowseOnly ) {
QPID_LOG ( info, "Queue " << name << " is browse-only." );
}
@@ -213,11 +211,6 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
Queue::~Queue()
{
- if (mgmtObject != 0) {
- mgmtObject->resourceDestroy();
- if (brokerMgmtObject)
- brokerMgmtObject->dec_queueCount();
- }
}
bool isLocalTo(const OwnershipToken* token, const Message& msg)
@@ -1076,6 +1069,12 @@ void Queue::destroyed()
boost::bind(&QueueObserver::destroy, _1));
observers.clear();
}
+
+ if (mgmtObject != 0) {
+ mgmtObject->resourceDestroy();
+ if (brokerMgmtObject)
+ brokerMgmtObject->dec_queueCount();
+ }
}
void Queue::notifyDeleted()
@@ -1109,7 +1108,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const
{
if (mgmtObject != 0 && persistenceId == 0 && externalQueueStore)
{
- ManagementObject* childObj = externalQueueStore->GetManagementObject();
+ ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObject();
if (childObj != 0)
childObj->setReference(mgmtObject->getObjectId());
}
@@ -1263,7 +1262,7 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
externalQueueStore = inst;
if (inst) {
- ManagementObject* childObj = inst->GetManagementObject();
+ ManagementObject::shared_ptr childObj = inst->GetManagementObject();
if (childObj != 0 && mgmtObject != 0)
childObj->setReference(mgmtObject->getObjectId());
}
@@ -1311,9 +1310,9 @@ void Queue::countLoadedFromDisk(uint64_t size) const
}
-ManagementObject* Queue::GetManagementObject (void) const
+ManagementObject::shared_ptr Queue::GetManagementObject (void) const
{
- return (ManagementObject*) mgmtObject;
+ return mgmtObject;
}
Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string& etext)
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index f6ea6e0adb..32e9201b5b 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -151,8 +151,8 @@ class Queue : public boost::enable_shared_from_this<Queue>,
std::string alternateExchangeName;
boost::shared_ptr<Exchange> alternateExchange;
framing::SequenceNumber sequence;
- qmf::org::apache::qpid::broker::Queue* mgmtObject;
- qmf::org::apache::qpid::broker::Broker* brokerMgmtObject;
+ qmf::org::apache::qpid::broker::Queue::shared_ptr mgmtObject;
+ qmf::org::apache::qpid::broker::Broker::shared_ptr brokerMgmtObject;
sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge.
int eventMode;
Observers observers;
@@ -339,7 +339,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
QPID_BROKER_EXTERN void countLoadedFromDisk(uint64_t size) const;
// Manageable entry points
- QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject (void) const;
+ QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject (void) const;
management::Manageable::status_t
QPID_BROKER_EXTERN ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
QPID_BROKER_EXTERN void query(::qpid::types::Variant::Map&) const;
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index c52cdee6a4..944cc7e838 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -68,7 +68,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue,
: StatefulQueueObserver(std::string("QueueFlowLimit")), queue(_queue), queueName("<unknown>"),
flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
- flowStopped(false), count(0), size(0), queueMgmtObj(0), broker(0)
+ flowStopped(false), count(0), size(0), broker(0)
{
uint32_t maxCount(0);
uint64_t maxSize(0);
@@ -78,7 +78,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue,
if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount();
if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize();
broker = queue->getBroker();
- queueMgmtObj = dynamic_cast<_qmfBroker::Queue*> (queue->GetManagementObject());
+ queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObject());
if (queueMgmtObj) {
queueMgmtObj->set_flowStopped(isFlowControlActive());
}
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
index 1bcc388ceb..0e83457efa 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
@@ -31,14 +31,8 @@
#include "qpid/framing/SequenceNumber.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Mutex.h"
+#include "qmf/org/apache/qpid/broker/Queue.h"
-namespace qmf {
-namespace org {
-namespace apache {
-namespace qpid {
-namespace broker {
- class Queue;
-}}}}}
namespace _qmfBroker = qmf::org::apache::qpid::broker;
namespace qpid {
@@ -118,7 +112,7 @@ struct QueueSettings;
std::map<framing::SequenceNumber, Message > index;
mutable qpid::sys::Mutex indexLock;
- _qmfBroker::Queue *queueMgmtObj;
+ _qmfBroker::Queue::shared_ptr queueMgmtObj;
const Broker *broker;
diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
index 2d7c820b63..bc7c96f08c 100644
--- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
+++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
@@ -424,7 +424,7 @@ void CyrusAuthenticator::start(const string& mechanism, const string* response)
&challenge, &challenge_len);
processAuthenticationStep(code, challenge, challenge_len);
- qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject();
+ qmf::org::apache::qpid::broker::Connection::shared_ptr cnxMgmt = connection.getMgmtObject();
if ( cnxMgmt )
cnxMgmt->set_saslMechanism(mechanism);
}
@@ -507,7 +507,7 @@ std::auto_ptr<SecurityLayer> CyrusAuthenticator::getSecurityLayer(uint16_t maxFr
if (ssf) {
securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize));
}
- qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject();
+ qmf::org::apache::qpid::broker::Connection::shared_ptr cnxMgmt = connection.getMgmtObject();
if ( cnxMgmt )
cnxMgmt->set_saslSsf(ssf);
return securityLayer;
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 65530394a3..f3d97dc078 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -300,8 +300,7 @@ Consumer(_name, type),
arguments(_arguments),
notifyEnabled(true),
syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)),
- deliveryCount(0),
- mgmtObject(0)
+ deliveryCount(0)
{
if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0)
{
@@ -310,17 +309,17 @@ Consumer(_name, type),
if (agent != 0)
{
- mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(),
- !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments));
+ mgmtObject = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(),
+ !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)));
agent->addObject (mgmtObject);
mgmtObject->set_creditMode("WINDOW");
}
}
}
-ManagementObject* SemanticState::ConsumerImpl::GetManagementObject (void) const
+ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObject (void) const
{
- return (ManagementObject*) mgmtObject;
+ return mgmtObject;
}
Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&)
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 67cfe808d0..1d7ccbfa9c 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -96,7 +96,7 @@ class SemanticState : private boost::noncopyable {
bool notifyEnabled;
const int syncFrequency;
int deliveryCount;
- qmf::org::apache::qpid::broker::Subscription* mgmtObject;
+ qmf::org::apache::qpid::broker::Subscription::shared_ptr mgmtObject;
bool checkCredit(const Message& msg);
void allocateCredit(const Message& msg);
@@ -160,7 +160,7 @@ class SemanticState : private boost::noncopyable {
void acknowledged(const DeliveryRecord&) {}
// manageable entry points
- QPID_BROKER_EXTERN management::ManagementObject*
+ QPID_BROKER_EXTERN management::ManagementObject::shared_ptr
GetManagementObject(void) const;
QPID_BROKER_EXTERN management::Manageable::status_t
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index fe357eb949..42be45f7ce 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -58,7 +58,6 @@ SessionState::SessionState(
broker(b), handler(&h),
semanticState(*this),
adapter(semanticState),
- mgmtObject(0),
asyncCommandCompleter(new AsyncCommandCompleter(this))
{
if (!delayManagement) addManagementObject();
@@ -71,8 +70,8 @@ void SessionState::addManagementObject() {
if (parent != 0) {
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent != 0) {
- mgmtObject = new _qmf::Session
- (agent, this, parent, getId().getName());
+ mgmtObject = _qmf::Session::shared_ptr(new _qmf::Session
+ (agent, this, parent, getId().getName()));
mgmtObject->set_attached (0);
mgmtObject->set_detachedLifespan (0);
mgmtObject->clr_expireTime();
@@ -149,9 +148,9 @@ void SessionState::giveReadCredit(int32_t credit) {
getConnection().outputTasks.giveReadCredit(credit);
}
-ManagementObject* SessionState::GetManagementObject (void) const
+ManagementObject::shared_ptr SessionState::GetManagementObject (void) const
{
- return (ManagementObject*) mgmtObject;
+ return mgmtObject;
}
Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 5e3a77d7ed..af384ff761 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -110,7 +110,7 @@ class SessionState : public qpid::SessionState,
const qpid::types::Variant::Map& annotations, bool sync);
// Manageable entry points
- management::ManagementObject* GetManagementObject (void) const;
+ management::ManagementObject::shared_ptr GetManagementObject (void) const;
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
@@ -168,7 +168,7 @@ class SessionState : public qpid::SessionState,
SemanticState semanticState;
SessionAdapter adapter;
MessageBuilder msgBuilder;
- qmf::org::apache::qpid::broker::Session* mgmtObject;
+ qmf::org::apache::qpid::broker::Session::shared_ptr mgmtObject;
qpid::framing::SequenceSet accepted;
// sequence numbers for pending received Execution.Sync commands
diff --git a/qpid/cpp/src/qpid/broker/System.cpp b/qpid/cpp/src/qpid/broker/System.cpp
index fa8df6406b..8d54427fdc 100644
--- a/qpid/cpp/src/qpid/broker/System.cpp
+++ b/qpid/cpp/src/qpid/broker/System.cpp
@@ -31,7 +31,7 @@ using namespace qpid::broker;
using namespace std;
namespace _qmf = qmf::org::apache::qpid::broker;
-System::System (string _dataDir, Broker* broker) : mgmtObject(0)
+System::System (string _dataDir, Broker* broker)
{
ManagementAgent* agent = broker ? broker->getManagementAgent() : 0;
@@ -64,7 +64,7 @@ System::System (string _dataDir, Broker* broker) : mgmtObject(0)
}
}
- mgmtObject = new _qmf::System(agent, this, types::Uuid(systemId.c_array()));
+ mgmtObject = _qmf::System::shared_ptr(new _qmf::System(agent, this, types::Uuid(systemId.c_array())));
qpid::sys::SystemInfo::getSystemId (osName,
nodeName,
release,
diff --git a/qpid/cpp/src/qpid/broker/System.h b/qpid/cpp/src/qpid/broker/System.h
index 6847c662ae..591d2a14a6 100644
--- a/qpid/cpp/src/qpid/broker/System.h
+++ b/qpid/cpp/src/qpid/broker/System.h
@@ -35,7 +35,7 @@ class System : public management::Manageable
{
private:
- qmf::org::apache::qpid::broker::System* mgmtObject;
+ qmf::org::apache::qpid::broker::System::shared_ptr mgmtObject;
framing::Uuid systemId;
std::string osName, nodeName, release, version, machine;
@@ -45,7 +45,7 @@ class System : public management::Manageable
System (std::string _dataDir, Broker* broker = 0);
- management::ManagementObject* GetManagementObject (void) const
+ management::ManagementObject::shared_ptr GetManagementObject (void) const
{ return mgmtObject; }
diff --git a/qpid/cpp/src/qpid/broker/Vhost.cpp b/qpid/cpp/src/qpid/broker/Vhost.cpp
index a9ca3b42ab..e72118b570 100644
--- a/qpid/cpp/src/qpid/broker/Vhost.cpp
+++ b/qpid/cpp/src/qpid/broker/Vhost.cpp
@@ -29,7 +29,7 @@ namespace qpid { namespace management {
class Manageable;
}}
-Vhost::Vhost (qpid::management::Manageable* parentBroker, Broker* broker) : mgmtObject(0)
+Vhost::Vhost (qpid::management::Manageable* parentBroker, Broker* broker)
{
if (parentBroker != 0 && broker != 0)
{
@@ -37,7 +37,7 @@ Vhost::Vhost (qpid::management::Manageable* parentBroker, Broker* broker) : mgmt
if (agent != 0)
{
- mgmtObject = new _qmf::Vhost(agent, this, parentBroker, "/");
+ mgmtObject = _qmf::Vhost::shared_ptr(new _qmf::Vhost(agent, this, parentBroker, "/"));
agent->addObject(mgmtObject, 0, true);
}
}
diff --git a/qpid/cpp/src/qpid/broker/Vhost.h b/qpid/cpp/src/qpid/broker/Vhost.h
index 9554d641c2..599b821870 100644
--- a/qpid/cpp/src/qpid/broker/Vhost.h
+++ b/qpid/cpp/src/qpid/broker/Vhost.h
@@ -32,7 +32,7 @@ class Vhost : public management::Manageable
{
private:
- qmf::org::apache::qpid::broker::Vhost* mgmtObject;
+ qmf::org::apache::qpid::broker::Vhost::shared_ptr mgmtObject;
public:
@@ -40,7 +40,7 @@ class Vhost : public management::Manageable
Vhost (management::Manageable* parentBroker, Broker* broker = 0);
- management::ManagementObject* GetManagementObject (void) const
+ management::ManagementObject::shared_ptr GetManagementObject (void) const
{ return mgmtObject; }
void setFederationTag(const std::string& tag);
};
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 877fa021d0..c3d0598249 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -64,7 +64,6 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
systemId(broker.getSystem()->getSystemId().data()),
settings(s),
observer(new ConnectionObserver(*this, systemId)),
- mgmtObject(0),
status(STANDALONE),
membership(systemId),
replicationTest(s.replicateDefault.get())
@@ -95,7 +94,7 @@ void HaBroker::initialize() {
if (settings.cluster && !ma)
throw Exception("Cannot start HA: management is disabled");
_qmf::Package packageInit(ma);
- mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
+ mgmtObject = _qmf::HaBroker::shared_ptr(new _qmf::HaBroker(ma, this, "ha-broker"));
mgmtObject->set_replicateDefault(settings.replicateDefault.str());
mgmtObject->set_systemId(systemId);
ma->addObject(mgmtObject);
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 48db0a8d3c..530211ced4 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -71,7 +71,7 @@ class HaBroker : public management::Manageable
void initialize();
// Implement Manageable.
- qpid::management::ManagementObject* GetManagementObject() const { return mgmtObject; }
+ qpid::management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObject; }
management::Manageable::status_t ManagementMethod (
uint32_t methodId, management::Args& args, std::string& text);
@@ -124,7 +124,7 @@ class HaBroker : public management::Manageable
boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary
boost::shared_ptr<Backup> backup;
boost::shared_ptr<Primary> primary;
- qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
+ qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject;
Url clientUrl, brokerUrl;
std::vector<Url> knownBrokers;
BrokerStatus status;
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index ce4e545c80..b933c71bbb 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -91,9 +91,8 @@ std::ostream& operator<<(std::ostream& o, const QueueSetPrinter& qp) {
void RemoteBackup::ready(const QueuePtr& q) {
catchupQueues.erase(q);
- QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName()
- << QueueSetPrinter(", waiting for: ", catchupQueues));
- if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready");
+ QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() << ", "
+ << catchupQueues.size() << " remain to catch up");
}
// Called via ConfigurationObserver::queueCreate and from catchupQueue
diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.cpp b/qpid/cpp/src/qpid/ha/StatusCheck.cpp
index 01fceb7783..17613ce3dd 100644
--- a/qpid/cpp/src/qpid/ha/StatusCheck.cpp
+++ b/qpid/cpp/src/qpid/ha/StatusCheck.cpp
@@ -87,8 +87,8 @@ void StatusCheckThread::run() {
string status = details["status"].getString();
if (status != "joining") {
statusCheck.setPromote(false);
- QPID_LOG(error, statusCheck.logPrefix << "Broker " << url << " status is " << status
- << " this broker will refuse promotion.");
+ QPID_LOG(info, statusCheck.logPrefix << "Status of " << url << " is "
+ << status << ", this broker will refuse promotion.");
}
QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status);
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 474c86ed48..8e19b68284 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -62,6 +62,7 @@ namespace _qmf = qmf::org::apache::qpid::broker;
namespace {
+ const size_t qmfV1BufferSize(65536);
const string defaultVendorName("vendor");
const string defaultProductName("product");
@@ -113,9 +114,8 @@ ManagementAgent::RemoteAgent::~RemoteAgent ()
QPID_LOG(debug, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]");
if (mgmtObject != 0) {
mgmtObject->resourceDestroy();
- agent.deleteObjectNowLH(mgmtObject->getObjectId());
- delete mgmtObject;
- mgmtObject = 0;
+ agent.deleteObjectNow(mgmtObject->getObjectId());
+ mgmtObject.reset();
}
}
@@ -124,8 +124,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
startTime(sys::now()),
suppressed(false), disallowAllV1Methods(false),
vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
- qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100),
- msgBuffer(MA_BUFFER_SIZE), memstat(0)
+ qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100)
{
nextObjectId = 1;
brokerBank = 1;
@@ -136,7 +135,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
attrMap["_vendor"] = defaultVendorName;
attrMap["_product"] = defaultProductName;
- memstat = new qmf::org::apache::qpid::broker::Memory(this, 0, "amqp-broker");
+ memstat = _qmf::Memory::shared_ptr(new qmf::org::apache::qpid::broker::Memory(this, 0, "amqp-broker"));
addObject(memstat, "amqp-broker");
}
@@ -155,15 +154,6 @@ ManagementAgent::~ManagementAgent ()
v2Direct.reset();
remoteAgents.clear();
-
- moveNewObjectsLH();
- for (ManagementObjectMap::iterator iter = managementObjects.begin ();
- iter != managementObjects.end ();
- iter++) {
- ManagementObject* object = iter->second;
- delete object;
- }
- managementObjects.clear();
}
}
@@ -316,11 +306,12 @@ void ManagementAgent::registerEvent (const string& packageName,
}
// Deprecated: V1 objects
-ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId, bool persistent)
+ObjectId ManagementAgent::addObject(ManagementObject::shared_ptr object, uint64_t persistId, bool persistent)
{
uint16_t sequence;
uint64_t objectNum;
+ sys::Mutex::ScopedLock lock(addLock);
sequence = persistent ? 0 : bootSequence;
objectNum = persistId ? persistId : nextObjectId++;
@@ -329,17 +320,14 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId
object->setObjectId(objId);
- {
- sys::Mutex::ScopedLock lock(addLock);
- newManagementObjects.push_back(object);
- }
+ newManagementObjects.push_back(object);
QPID_LOG(debug, "Management object (V1) added: " << objId.getV2Key());
return objId;
}
-ObjectId ManagementAgent::addObject(ManagementObject* object,
+ObjectId ManagementAgent::addObject(ManagementObject::shared_ptr object,
const string& key,
bool persistent)
{
@@ -369,12 +357,11 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
"emerg", "alert", "crit", "error", "warn",
"note", "info", "debug"
};
- sys::Mutex::ScopedLock lock (userLock);
uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
if (qmf1Support) {
- Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ char buffer[qmfV1BufferSize];
+ Buffer outBuffer(buffer, qmfV1BufferSize);
encodeHeader(outBuffer, 'e');
outBuffer.putShortString(event.getPackageName());
@@ -385,9 +372,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
string sBuf;
event.encode(sBuf);
outBuffer.putRawData(sBuf);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, mExchange,
+ sendBuffer(outBuffer, mExchange,
"console.event.1.0." + event.getPackageName() + "." + event.getEventName());
QPID_LOG(debug, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName());
}
@@ -426,7 +411,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
Variant::List list_;
list_.push_back(map_);
ListCodec::encode(list_, content);
- sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str());
+ sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str());
QPID_LOG(debug, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName());
}
}
@@ -480,12 +465,9 @@ void ManagementAgent::clientAdded (const string& routingKey)
while (rkeys.size()) {
char localBuffer[16];
Buffer outBuffer(localBuffer, 16);
- uint32_t outLen;
encodeHeader(outBuffer, 'x');
- outLen = outBuffer.getPosition();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, rkeys.front());
+ sendBuffer(outBuffer, dExchange, rkeys.front());
QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << rkeys.front());
rkeys.pop_front();
}
@@ -496,8 +478,8 @@ void ManagementAgent::clusterUpdate() {
// Set clientWasAdded so that on the next periodicProcessing we will do
// a full update on all cluster members.
sys::Mutex::ScopedLock l(userLock);
- moveNewObjectsLH(); // keep lists consistent with updater/updatee.
- moveDeletedObjectsLH();
+ moveNewObjects(); // keep lists consistent with updater/updatee.
+ moveDeletedObjects();
clientWasAdded = true;
debugSnapshot("Cluster member joined");
}
@@ -523,12 +505,9 @@ bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq)
return h1 == 'A' && h2 == 'M' && h3 == '2';
}
-// NOTE WELL: assumes userLock is held by caller (LH)
-// NOTE EVEN WELLER: drops this lock when delivering the message!!!
-void ManagementAgent::sendBufferLH(Buffer& buf,
- uint32_t length,
- qpid::broker::Exchange::shared_ptr exchange,
- const string& routingKey)
+void ManagementAgent::sendBuffer(Buffer& buf,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const string& routingKey)
{
if (suppressed) {
QPID_LOG(debug, "Suppressing management message to " << routingKey);
@@ -541,6 +520,8 @@ void ManagementAgent::sendBufferLH(Buffer& buf,
AMQFrame header((AMQHeaderBody()));
AMQFrame content((AMQContentBody()));
+ size_t length = buf.getPosition();
+ buf.reset();
content.castBody<AMQContentBody>()->decode(buf, length);
method.setEof(false);
@@ -564,38 +545,31 @@ void ManagementAgent::sendBufferLH(Buffer& buf,
Message msg(transfer, transfer);
msg.setIsManagementMessage(true);
- {
- sys::Mutex::ScopedUnlock u(userLock);
-
- DeliverableMessage deliverable (msg, 0);
- try {
- exchange->route(deliverable);
- } catch(exception&) {}
- }
+ DeliverableMessage deliverable (msg, 0);
+ try {
+ exchange->route(deliverable);
+ } catch(exception&) {}
buf.reset();
}
-void ManagementAgent::sendBufferLH(Buffer& buf,
- uint32_t length,
- const string& exchange,
- const string& routingKey)
+void ManagementAgent::sendBuffer(Buffer& buf,
+ const string& exchange,
+ const string& routingKey)
{
qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange));
if (ex.get() != 0)
- sendBufferLH(buf, length, ex, routingKey);
+ sendBuffer(buf, ex, routingKey);
}
-// NOTE WELL: assumes userLock is held by caller (LH)
-// NOTE EVEN WELLER: drops this lock when delivering the message!!!
-void ManagementAgent::sendBufferLH(const string& data,
- const string& cid,
- const Variant::Map& headers,
- const string& content_type,
- qpid::broker::Exchange::shared_ptr exchange,
- const string& routingKey,
- uint64_t ttl_msec)
+void ManagementAgent::sendBuffer(const string& data,
+ const string& cid,
+ const Variant::Map& headers,
+ const string& content_type,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const string& routingKey,
+ uint64_t ttl_msec)
{
Variant::Map::const_iterator i;
@@ -643,34 +617,30 @@ void ManagementAgent::sendBufferLH(const string& data,
msg.setIsManagementMessage(true);
msg.computeExpiration(broker->getExpiryPolicy());
- {
- sys::Mutex::ScopedUnlock u(userLock);
-
- DeliverableMessage deliverable (msg, 0);
- try {
- exchange->route(deliverable);
- } catch(exception&) {}
- }
+ DeliverableMessage deliverable (msg,0);
+ try {
+ exchange->route(deliverable);
+ } catch(exception&) {}
}
-void ManagementAgent::sendBufferLH(const string& data,
- const string& cid,
- const Variant::Map& headers,
- const string& content_type,
- const string& exchange,
- const string& routingKey,
- uint64_t ttl_msec)
+void ManagementAgent::sendBuffer(const string& data,
+ const string& cid,
+ const Variant::Map& headers,
+ const string& content_type,
+ const string& exchange,
+ const string& routingKey,
+ uint64_t ttl_msec)
{
qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange));
if (ex.get() != 0)
- sendBufferLH(data, cid, headers, content_type, ex, routingKey, ttl_msec);
+ sendBuffer(data, cid, headers, content_type, ex, routingKey, ttl_msec);
}
/** Objects that have been added since the last periodic poll are temporarily
* saved in the newManagementObjects list. This allows objects to be
- * added without needing to block on the userLock (addLock is used instead).
+ * added without needing to block on the userLock (objectLock is used instead).
* These new objects need to be integrated into the object database
* (managementObjects) *before* they can be properly managed. This routine
* performs the integration.
@@ -680,34 +650,33 @@ void ManagementAgent::sendBufferLH(const string& data,
* duplicate object ids. To avoid clashes, don't put deleted objects
* into the active object database.
*/
-void ManagementAgent::moveNewObjectsLH()
+void ManagementAgent::moveNewObjects()
{
- sys::Mutex::ScopedLock lock (addLock);
+ sys::Mutex::ScopedLock lock(addLock);
+ sys::Mutex::ScopedLock objLock (objectLock);
while (!newManagementObjects.empty()) {
- ManagementObject *object = newManagementObjects.back();
+ ManagementObject::shared_ptr object = newManagementObjects.back();
newManagementObjects.pop_back();
if (object->isDeleted()) {
DeletedObject::shared_ptr dptr(new DeletedObject(object, qmf1Support, qmf2Support));
pendingDeletedObjs[dptr->getKey()].push_back(dptr);
- delete object;
} else { // add to active object list, check for duplicates.
ObjectId oid = object->getObjectId();
ManagementObjectMap::iterator destIter = managementObjects.find(oid);
if (destIter != managementObjects.end()) {
// duplicate found. It is OK if the old object has been marked
// deleted, just replace the old with the new.
- ManagementObject *oldObj = destIter->second;
- if (oldObj->isDeleted()) {
- DeletedObject::shared_ptr dptr(new DeletedObject(oldObj, qmf1Support, qmf2Support));
- pendingDeletedObjs[dptr->getKey()].push_back(dptr);
- delete oldObj;
- } else {
+ ManagementObject::shared_ptr oldObj = destIter->second;
+ if (!oldObj->isDeleted()) {
// Duplicate non-deleted objects? This is a user error - oids must be unique.
// for now, leak the old object (safer than deleting - may still be referenced)
// and complain loudly...
QPID_LOG(error, "Detected two management objects with the same identifier: " << oid);
+ oldObj->resourceDestroy();
}
+ DeletedObject::shared_ptr dptr(new DeletedObject(oldObj, qmf1Support, qmf2Support));
+ pendingDeletedObjs[dptr->getKey()].push_back(dptr);
// QPID-3666: be sure to replace the -index- also, as non-key members of
// the index object may be different for the new object! So erase the
// entry, rather than []= assign here:
@@ -720,32 +689,41 @@ void ManagementAgent::moveNewObjectsLH()
void ManagementAgent::periodicProcessing (void)
{
-#define BUFSIZE 65536
#define HEADROOM 4096
debugSnapshot("Management agent periodic processing");
sys::Mutex::ScopedLock lock (userLock);
- uint32_t contentSize;
string routingKey;
string sBuf;
- moveNewObjectsLH();
+ moveNewObjects();
//
// If we're publishing updates, get the latest memory statistics and uptime now
//
if (publish) {
uint64_t uptime = sys::Duration(startTime, sys::now());
- static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
- qpid::sys::MemStat::loadMemInfo(memstat);
+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
+ qpid::sys::MemStat::loadMemInfo(memstat.get());
+ }
+
+ //
+ // Use a copy of the management object map to avoid holding the objectLock
+ //
+ ManagementObjectVector localManagementObjects;
+ {
+ sys::Mutex::ScopedLock objLock(objectLock);
+ std::transform(managementObjects.begin(), managementObjects.end(),
+ std::back_inserter(localManagementObjects),
+ boost::bind(&ManagementObjectMap::value_type::second, _1));
}
//
// Clear the been-here flag on all objects in the map.
//
- for (ManagementObjectMap::iterator iter = managementObjects.begin();
- iter != managementObjects.end();
+ for (ManagementObjectVector::iterator iter = localManagementObjects.begin();
+ iter != localManagementObjects.end();
iter++) {
- ManagementObject* object = iter->second;
+ ManagementObject::shared_ptr object = *iter;
object->setFlags(0);
if (clientWasAdded) {
object->setForcePublish(true);
@@ -760,22 +738,25 @@ void ManagementAgent::periodicProcessing (void)
// if we sent the active update first, _then_ the delete update, clients
// would incorrectly think the object was deleted. See QPID-2997
//
- bool objectsDeleted = moveDeletedObjectsLH();
+ bool objectsDeleted = moveDeletedObjects();
+ PendingDeletedObjsMap localPendingDeletedObjs;
+ {
+ sys::Mutex::ScopedLock objLock(objectLock);
+ localPendingDeletedObjs.swap(pendingDeletedObjs);
+ }
//
// If we are not publishing updates, just clear the pending deletes. There's no
// need to tell anybody.
//
if (!publish)
- pendingDeletedObjs.clear();
-
- if (!pendingDeletedObjs.empty()) {
- // use a temporary copy of the pending deletes so dropping the lock when
- // the buffer is sent is safe.
- PendingDeletedObjsMap tmp(pendingDeletedObjs);
- pendingDeletedObjs.clear();
+ localPendingDeletedObjs.clear();
- for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) {
+ ResizableBuffer msgBuffer(qmfV1BufferSize);
+ if (!localPendingDeletedObjs.empty()) {
+ for (PendingDeletedObjsMap::iterator mIter = localPendingDeletedObjs.begin();
+ mIter != localPendingDeletedObjs.end();
+ mIter++) {
std::string packageName;
std::string className;
msgBuffer.reset();
@@ -807,11 +788,10 @@ void ManagementAgent::periodicProcessing (void)
}
if (v1Objs >= maxReplyObjs) {
v1Objs = 0;
- contentSize = msgBuffer.getSize();
stringstream key;
key << "console.obj.1.0." << packageName << "." << className;
- msgBuffer.reset();
- sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
+ size_t contentSize = msgBuffer.getPosition();
+ sendBuffer(msgBuffer, mExchange, key.str());
QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to="
<< key.str() << " len=" << contentSize);
}
@@ -840,7 +820,7 @@ void ManagementAgent::periodicProcessing (void)
headers["qmf.content"] = "_data";
headers["qmf.agent"] = name_address;
- sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
+ sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0);
QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
}
}
@@ -850,11 +830,10 @@ void ManagementAgent::periodicProcessing (void)
// send any remaining objects...
if (v1Objs) {
- contentSize = BUFSIZE - msgBuffer.available();
stringstream key;
key << "console.obj.1.0." << packageName << "." << className;
- msgBuffer.reset();
- sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
+ size_t contentSize = msgBuffer.getPosition();
+ sendBuffer(msgBuffer, mExchange, key.str());
QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize);
}
@@ -877,7 +856,7 @@ void ManagementAgent::periodicProcessing (void)
headers["qmf.content"] = "_data";
headers["qmf.agent"] = name_address;
- sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
+ sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0);
QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length());
}
}
@@ -885,9 +864,7 @@ void ManagementAgent::periodicProcessing (void)
}
//
- // Process the entire object map. Remember: we drop the userLock each time we call
- // sendBuffer(). This allows the managementObjects map to be altered during the
- // sendBuffer() call, so always restart the search after a sendBuffer() call
+ // Process the entire object map.
//
// If publish is disabled, don't send any updates.
//
@@ -897,14 +874,14 @@ void ManagementAgent::periodicProcessing (void)
uint32_t pcount;
uint32_t scount;
uint32_t v1Objs, v2Objs;
- ManagementObjectMap::iterator baseIter;
+ ManagementObjectVector::iterator baseIter;
std::string packageName;
std::string className;
- for (baseIter = managementObjects.begin();
- baseIter != managementObjects.end();
+ for (baseIter = localManagementObjects.begin();
+ baseIter != localManagementObjects.end();
baseIter++) {
- ManagementObject* baseObject = baseIter->second;
+ ManagementObject::shared_ptr baseObject = *baseIter;
//
// Skip until we find a base object requiring processing...
//
@@ -915,7 +892,7 @@ void ManagementAgent::periodicProcessing (void)
}
}
- if (baseIter == managementObjects.end())
+ if (baseIter == localManagementObjects.end())
break; // done - all objects processed
pcount = scount = 0;
@@ -924,12 +901,12 @@ void ManagementAgent::periodicProcessing (void)
list_.clear();
msgBuffer.reset();
- for (ManagementObjectMap::iterator iter = baseIter;
- iter != managementObjects.end();
+ for (ManagementObjectVector::iterator iter = baseIter;
+ iter != localManagementObjects.end();
iter++) {
msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space
- ManagementObject* baseObject = baseIter->second;
- ManagementObject* object = iter->second;
+ ManagementObject::shared_ptr baseObject = *baseIter;
+ ManagementObject::shared_ptr object = *iter;
bool send_stats, send_props;
if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
object->setFlags(1);
@@ -1004,12 +981,11 @@ void ManagementAgent::periodicProcessing (void)
if (pcount || scount) {
if (qmf1Support) {
- contentSize = BUFSIZE - msgBuffer.available();
- if (contentSize > 0) {
+ if (msgBuffer.getPosition() > 0) {
stringstream key;
key << "console.obj.1.0." << packageName << "." << className;
- msgBuffer.reset();
- sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK
+ size_t contentSize = msgBuffer.getPosition();
+ sendBuffer(msgBuffer, mExchange, key.str());
QPID_LOG(debug, "SEND V1 Multicast ContentInd to=" << key.str()
<< " props=" << pcount
<< " stats=" << scount
@@ -1035,7 +1011,7 @@ void ManagementAgent::periodicProcessing (void)
headers["qmf.content"] = "_data";
headers["qmf.agent"] = name_address;
- sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK
+ sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0);
QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str()
<< " props=" << pcount
<< " stats=" << scount
@@ -1045,21 +1021,21 @@ void ManagementAgent::periodicProcessing (void)
}
} // end processing updates for all objects
- if (objectsDeleted) deleteOrphanedAgentsLH();
+ if (objectsDeleted) {
+ sys::Mutex::ScopedLock lock (userLock);
+ deleteOrphanedAgentsLH();
+ }
// heartbeat generation. Note that heartbeats need to be sent even if publish is disabled.
if (qmf1Support) {
- uint32_t contentSize;
- char msgChars[BUFSIZE];
- Buffer msgBuffer(msgChars, BUFSIZE);
+ char msgChars[qmfV1BufferSize];
+ Buffer msgBuffer(msgChars, qmfV1BufferSize);
encodeHeader(msgBuffer, 'h');
msgBuffer.putLongLong(uint64_t(sys::Duration(sys::EPOCH, sys::now())));
- contentSize = BUFSIZE - msgBuffer.available ();
- msgBuffer.reset ();
routingKey = "console.heartbeat.1.0";
- sendBufferLH(msgBuffer, contentSize, mExchange, routingKey);
+ sendBuffer(msgBuffer, mExchange, routingKey);
QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey);
}
@@ -1087,23 +1063,26 @@ void ManagementAgent::periodicProcessing (void)
// Set TTL (in msecs) on outgoing heartbeat indications based on the interval
// time to prevent stale heartbeats from getting to the consoles.
- sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000);
+ sendBuffer(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000);
QPID_LOG(debug, "SENT AgentHeartbeat name=" << name_address);
}
}
-void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
+void ManagementAgent::deleteObjectNow(const ObjectId& oid)
{
- ManagementObjectMap::iterator iter = managementObjects.find(oid);
- if (iter == managementObjects.end())
- return;
- ManagementObject* object = iter->second;
- if (!object->isDeleted())
- return;
+ ManagementObject::shared_ptr object;
+ {
+ sys::Mutex::ScopedLock lock(objectLock);
+ ManagementObjectMap::iterator iter = managementObjects.find(oid);
+ if (iter == managementObjects.end())
+ return;
+ object = iter->second;
+ if (!object->isDeleted())
+ return;
+ managementObjects.erase(oid);
+ }
- // since sendBufferLH drops the userLock, don't call it until we
- // are done manipulating the object.
#define DNOW_BUFSIZE 2048
char msgChars[DNOW_BUFSIZE];
Buffer msgBuffer(msgChars, DNOW_BUFSIZE);
@@ -1139,15 +1118,12 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
v2key << "." << instanceNameKey;
}
- object = 0;
- managementObjects.erase(oid);
+ object.reset();
// object deleted, ok to drop lock now.
if (publish && qmf1Support) {
- uint32_t contentSize = msgBuffer.getPosition();
- msgBuffer.reset();
- sendBufferLH(msgBuffer, contentSize, mExchange, v1key.str());
+ sendBuffer(msgBuffer, mExchange, v1key.str());
QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str());
}
@@ -1160,29 +1136,26 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
string content;
ListCodec::encode(list_, content);
- sendBufferLH(content, "", headers, "amqp/list", v2Topic, v2key.str());
+ sendBuffer(content, "", headers, "amqp/list", v2Topic, v2key.str(), 0);
QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v2key.str());
}
}
-void ManagementAgent::sendCommandCompleteLH(const string& replyToKey, uint32_t sequence,
- uint32_t code, const string& text)
+void ManagementAgent::sendCommandComplete(const string& replyToKey, uint32_t sequence,
+ uint32_t code, const string& text)
{
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer (qmfV1BufferSize);
encodeHeader (outBuffer, 'z', sequence);
outBuffer.putLong (code);
outBuffer.putShortString (text);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" <<
replyToKey << " seq=" << sequence);
}
-void ManagementAgent::sendExceptionLH(const string& rte, const string& rtk, const string& cid,
- const string& text, uint32_t code, bool viaLocal)
+void ManagementAgent::sendException(const string& rte, const string& rtk, const string& cid,
+ const string& text, uint32_t code, bool viaLocal)
{
static const string addr_exchange("qmf.default.direct");
@@ -1200,7 +1173,7 @@ void ManagementAgent::sendExceptionLH(const string& rte, const string& rtk, cons
map["_values"] = values;
MapCodec::encode(map, content);
- sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
+ sendBuffer(content, cid, headers, "amqp/map", rte, rtk);
QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text);
}
@@ -1211,7 +1184,6 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
const bool topic,
int qmfVersion)
{
- sys::Mutex::ScopedLock lock (userLock);
Message& msg = ((DeliverableMessage&) deliverable).getMessage ();
if (topic && qmfVersion == 1) {
@@ -1225,23 +1197,23 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
// schema.#
if (routingKey == "broker") {
- dispatchAgentCommandLH(msg);
+ dispatchAgentCommand(msg);
return false;
}
if (routingKey.length() > 6) {
if (routingKey.compare(0, 9, "agent.1.0") == 0) {
- dispatchAgentCommandLH(msg);
+ dispatchAgentCommand(msg);
return false;
}
if (routingKey.compare(0, 8, "agent.1.") == 0) {
- return authorizeAgentMessageLH(msg);
+ return authorizeAgentMessage(msg);
}
if (routingKey.compare(0, 7, "schema.") == 0) {
- dispatchAgentCommandLH(msg);
+ dispatchAgentCommand(msg);
return true;
}
}
@@ -1253,7 +1225,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
// Intercept messages bound to:
// "console.ind.locate.# - process these messages, and also allow them to be forwarded.
if (routingKey == "console.request.agent_locate") {
- dispatchAgentCommandLH(msg);
+ dispatchAgentCommand(msg);
return true;
}
@@ -1264,7 +1236,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
// "<name_address>" - the broker agent's proper name
// and do not forward them futher
if (routingKey == "broker" || routingKey == name_address) {
- dispatchAgentCommandLH(msg, routingKey == "broker");
+ dispatchAgentCommand(msg, routingKey == "broker");
return false;
}
}
@@ -1273,16 +1245,15 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
return true;
}
-void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken)
+void ManagementAgent::handleMethodRequest(Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken)
{
- moveNewObjectsLH();
+ moveNewObjects();
string methodName;
string packageName;
string className;
uint8_t hash[16];
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer (qmfV1BufferSize);
AclModule* acl = broker->getAcl();
string inArgs;
@@ -1304,9 +1275,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl
if (disallowAllV1Methods) {
outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
outBuffer.putMediumString("QMFv1 methods forbidden on this broker, use QMFv2");
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence);
return;
}
@@ -1315,9 +1284,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl
if (i != disallowed.end()) {
outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
outBuffer.putMediumString(i->second);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence);
return;
}
@@ -1331,30 +1298,34 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl
if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params)) {
outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
return;
}
}
- ManagementObjectMap::iterator iter = numericFind(objId);
- if (iter == managementObjects.end() || iter->second->isDeleted()) {
+ ManagementObject::shared_ptr object;
+ {
+ sys::Mutex::ScopedLock lock(objectLock);
+ ManagementObjectMap::iterator iter = numericFind(objId);
+ if (iter != managementObjects.end())
+ object = iter->second;
+ }
+
+ if (!object || object->isDeleted()) {
outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT);
outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT));
} else {
- if ((iter->second->getPackageName() != packageName) ||
- (iter->second->getClassName() != className)) {
+ if ((object->getPackageName() != packageName) ||
+ (object->getClassName() != className)) {
outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID);
outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID));
}
else {
uint32_t pos = outBuffer.getPosition();
try {
- sys::Mutex::ScopedUnlock u(userLock);
string outBuf;
- iter->second->doMethod(methodName, inArgs, outBuf, userId);
+ object->doMethod(methodName, inArgs, outBuf, userId);
outBuffer.putRawData(outBuf);
} catch(exception& e) {
outBuffer.setPosition(pos);;
@@ -1364,17 +1335,15 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl
}
}
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
-void ManagementAgent::handleMethodRequestLH (const string& body, const string& rte, const string& rtk,
- const string& cid, const ConnectionToken* connToken, bool viaLocal)
+void ManagementAgent::handleMethodRequest (const string& body, const string& rte, const string& rtk,
+ const string& cid, const ConnectionToken* connToken, bool viaLocal)
{
- moveNewObjectsLH();
+ moveNewObjects();
string methodName;
Variant::Map inMap;
@@ -1393,8 +1362,8 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r
if ((oid = inMap.find("_object_id")) == inMap.end() ||
(mid = inMap.find("_method_name")) == inMap.end()) {
- sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
- Manageable::STATUS_PARAMETER_INVALID, viaLocal);
+ sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
+ Manageable::STATUS_PARAMETER_INVALID, viaLocal);
return;
}
@@ -1412,16 +1381,22 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r
inArgs = (mid->second).asMap();
}
} catch(exception& e) {
- sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
+ sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
return;
}
- ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ ManagementObject::shared_ptr object;
+ {
+ sys::Mutex::ScopedLock lock(objectLock);
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter != managementObjects.end())
+ object = iter->second;
+ }
- if (iter == managementObjects.end() || iter->second->isDeleted()) {
+ if (!object || object->isDeleted()) {
stringstream estr;
estr << "No object found with ID=" << objId;
- sendExceptionLH(rte, rtk, cid, estr.str(), 1, viaLocal);
+ sendException(rte, rtk, cid, estr.str(), 1, viaLocal);
return;
}
@@ -1429,20 +1404,20 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r
AclModule* acl = broker->getAcl();
DisallowedMethods::const_iterator i;
- i = disallowed.find(make_pair(iter->second->getClassName(), methodName));
+ i = disallowed.find(make_pair(object->getClassName(), methodName));
if (i != disallowed.end()) {
- sendExceptionLH(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal);
+ sendException(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal);
return;
}
string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId();
if (acl != 0) {
map<acl::Property, string> params;
- params[acl::PROP_SCHEMAPACKAGE] = iter->second->getPackageName();
- params[acl::PROP_SCHEMACLASS] = iter->second->getClassName();
+ params[acl::PROP_SCHEMAPACKAGE] = object->getPackageName();
+ params[acl::PROP_SCHEMACLASS] = object->getClassName();
if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, &params)) {
- sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
+ sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
Manageable::STATUS_FORBIDDEN, viaLocal);
return;
}
@@ -1450,13 +1425,12 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r
// invoke the method
- QPID_LOG(debug, "RECV MethodRequest (v2) class=" << iter->second->getPackageName()
- << ":" << iter->second->getClassName() << " method=" <<
+ QPID_LOG(debug, "RECV MethodRequest (v2) class=" << object->getPackageName()
+ << ":" << object->getClassName() << " method=" <<
methodName << " replyTo=" << rte << "/" << rtk << " objId=" << objId << " inArgs=" << inArgs);
try {
- sys::Mutex::ScopedUnlock u(userLock);
- iter->second->doMethod(methodName, inArgs, callMap, userId);
+ object->doMethod(methodName, inArgs, callMap, userId);
errorCode = callMap["_status_code"].asUint32();
if (errorCode == 0) {
outMap["_arguments"] = Variant::Map();
@@ -1467,62 +1441,59 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r
} else
error = callMap["_status_text"].asString();
} catch(exception& e) {
- sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
+ sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal);
return;
}
if (errorCode != 0) {
- sendExceptionLH(rte, rtk, cid, error, errorCode, viaLocal);
+ sendException(rte, rtk, cid, error, errorCode, viaLocal);
return;
}
MapCodec::encode(outMap, content);
- sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
+ sendBuffer(content, cid, headers, "amqp/map", rte, rtk);
QPID_LOG(debug, "SEND MethodResponse (v2) to=" << rte << "/" << rtk << " seq=" << cid << " map=" << outMap);
}
-void ManagementAgent::handleBrokerRequestLH (Buffer&, const string& replyToKey, uint32_t sequence)
+void ManagementAgent::handleBrokerRequest (Buffer&, const string& replyToKey, uint32_t sequence)
{
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer (qmfV1BufferSize);
QPID_LOG(debug, "RECV BrokerRequest replyTo=" << replyToKey);
encodeHeader (outBuffer, 'b', sequence);
uuid.encode (outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey);
}
-void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, uint32_t sequence)
+void ManagementAgent::handlePackageQuery (Buffer&, const string& replyToKey, uint32_t sequence)
{
QPID_LOG(debug, "RECV PackageQuery replyTo=" << replyToKey);
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer (qmfV1BufferSize);
- for (PackageMap::iterator pIter = packages.begin ();
- pIter != packages.end ();
- pIter++)
{
- encodeHeader (outBuffer, 'p', sequence);
- encodePackageIndication (outBuffer, pIter);
+ sys::Mutex::ScopedLock lock(userLock);
+ for (PackageMap::iterator pIter = packages.begin ();
+ pIter != packages.end ();
+ pIter++)
+ {
+ encodeHeader (outBuffer, 'p', sequence);
+ encodePackageIndication (outBuffer, pIter);
+ }
}
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- if (outLen) {
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ if (outBuffer.getPosition() > 0) {
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND PackageInd to=" << replyToKey << " seq=" << sequence);
}
- sendCommandCompleteLH(replyToKey, sequence);
+ sendCommandComplete(replyToKey, sequence);
}
-void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
+void ManagementAgent::handlePackageInd (Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
{
string packageName;
@@ -1530,10 +1501,11 @@ void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, const string& replyT
QPID_LOG(debug, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
+ sys::Mutex::ScopedLock lock(userLock);
findOrAddPackageLH(packageName);
}
-void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
+void ManagementAgent::handleClassQuery(Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
{
string packageName;
@@ -1541,40 +1513,39 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyTo
QPID_LOG(debug, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence);
- PackageMap::iterator pIter = packages.find(packageName);
- if (pIter != packages.end())
+ typedef std::pair<SchemaClassKey, uint8_t> _ckeyType;
+ std::list<_ckeyType> classes;
{
- typedef std::pair<SchemaClassKey, uint8_t> _ckeyType;
- std::list<_ckeyType> classes;
- ClassMap &cMap = pIter->second;
- for (ClassMap::iterator cIter = cMap.begin();
- cIter != cMap.end();
- cIter++) {
- if (cIter->second.hasSchema()) {
- classes.push_back(make_pair(cIter->first, cIter->second.kind));
+ sys::Mutex::ScopedLock lock(userLock);
+ PackageMap::iterator pIter = packages.find(packageName);
+ if (pIter != packages.end())
+ {
+ ClassMap &cMap = pIter->second;
+ for (ClassMap::iterator cIter = cMap.begin();
+ cIter != cMap.end();
+ cIter++) {
+ if (cIter->second.hasSchema()) {
+ classes.push_back(make_pair(cIter->first, cIter->second.kind));
+ }
}
}
+ }
- while (classes.size()) {
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
-
- encodeHeader(outBuffer, 'q', sequence);
- encodeClassIndication(outBuffer, packageName, classes.front().first, classes.front().second);
+ while (classes.size()) {
+ ResizableBuffer outBuffer(qmfV1BufferSize);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
- QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name <<
- "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence);
- classes.pop_front();
- }
+ encodeHeader(outBuffer, 'q', sequence);
+ encodeClassIndication(outBuffer, packageName, classes.front().first, classes.front().second);
+ sendBuffer(outBuffer, dExchange, replyToKey);
+ QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name <<
+ "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence);
+ classes.pop_front();
}
- sendCommandCompleteLH(replyToKey, sequence);
+ sendCommandComplete(replyToKey, sequence);
}
-void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToKey, uint32_t)
+void ManagementAgent::handleClassInd (Buffer& inBuffer, const string& replyToKey, uint32_t)
{
string packageName;
SchemaClassKey key;
@@ -1587,20 +1558,18 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToK
QPID_LOG(debug, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), replyTo=" << replyToKey);
+ sys::Mutex::ScopedLock lock(userLock);
PackageMap::iterator pIter = findOrAddPackageLH(packageName);
ClassMap::iterator cIter = pIter->second.find(key);
if (cIter == pIter->second.end() || !cIter->second.hasSchema()) {
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer (qmfV1BufferSize);
uint32_t sequence = nextRequestSequence++;
// Schema Request
encodeHeader (outBuffer, 'S', sequence);
outBuffer.putShortString(packageName);
key.encode(outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), to=" << replyToKey << " seq=" << sequence);
@@ -1625,7 +1594,7 @@ void ManagementAgent::SchemaClass::appendSchema(Buffer& buf)
buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size());
}
-void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& rte, const string& rtk, uint32_t sequence)
+void ManagementAgent::handleSchemaRequest(Buffer& inBuffer, const string& rte, const string& rtk, uint32_t sequence)
{
string packageName;
SchemaClassKey key;
@@ -1636,34 +1605,32 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& rte,
QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) <<
"), replyTo=" << rte << "/" << rtk << " seq=" << sequence);
+ sys::Mutex::ScopedLock lock(userLock);
PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end()) {
ClassMap& cMap = pIter->second;
ClassMap::iterator cIter = cMap.find(key);
if (cIter != cMap.end()) {
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer(qmfV1BufferSize);
SchemaClass& classInfo = cIter->second;
if (classInfo.hasSchema()) {
encodeHeader(outBuffer, 's', sequence);
classInfo.appendSchema(outBuffer);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, rte, rtk);
+ sendBuffer(outBuffer, rte, rtk);
QPID_LOG(debug, "SEND SchemaResponse to=" << rte << "/" << rtk << " seq=" << sequence);
}
else
- sendCommandCompleteLH(rtk, sequence, 1, "Schema not available");
+ sendCommandComplete(rtk, sequence, 1, "Schema not available");
}
else
- sendCommandCompleteLH(rtk, sequence, 1, "Class key not found");
+ sendCommandComplete(rtk, sequence, 1, "Class key not found");
}
else
- sendCommandCompleteLH(rtk, sequence, 1, "Package not found");
+ sendCommandComplete(rtk, sequence, 1, "Package not found");
}
-void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*replyToKey*/, uint32_t sequence)
+void ManagementAgent::handleSchemaResponse(Buffer& inBuffer, const string& /*replyToKey*/, uint32_t sequence)
{
string packageName;
SchemaClassKey key;
@@ -1676,6 +1643,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r
QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence);
+ sys::Mutex::ScopedLock lock(userLock);
PackageMap::iterator pIter = packages.find(packageName);
if (pIter != packages.end()) {
ClassMap& cMap = pIter->second;
@@ -1690,14 +1658,11 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r
inBuffer.getRawData(reinterpret_cast<uint8_t*>(&cIter->second.data[0]), length);
// Publish a class-indication message
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer(qmfV1BufferSize);
encodeHeader(outBuffer, 'q');
encodeClassIndication(outBuffer, pIter->first, cIter->first, cIter->second.kind);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, mExchange, "schema.class");
+ sendBuffer(outBuffer, mExchange, "schema.class");
QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" <<
" to=schema.class");
}
@@ -1756,7 +1721,7 @@ void ManagementAgent::deleteOrphanedAgentsLH()
remoteAgents.erase(*dIter);
}
-void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken)
+void ManagementAgent::handleAttachRequest (Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken)
{
string label;
uint32_t requestedBrokerBank, requestedAgentBank;
@@ -1764,12 +1729,14 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep
ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId();
Uuid systemId;
- moveNewObjectsLH();
+ moveNewObjects();
+
+ sys::Mutex::ScopedLock lock(userLock);
deleteOrphanedAgentsLH();
RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef);
if (aIter != remoteAgents.end()) {
// There already exists an agent on this session. Reject the request.
- sendCommandCompleteLH(replyToKey, sequence, 1, "Connection already has remote agent");
+ sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent");
return;
}
@@ -1788,7 +1755,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep
agent->agentBank = assignedBank;
agent->routingKey = replyToKey;
agent->connectionRef = connectionRef;
- agent->mgmtObject = new _qmf::Agent (this, agent.get());
+ agent->mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent (this, agent.get()));
agent->mgmtObject->set_connectionRef(agent->connectionRef);
agent->mgmtObject->set_label (label);
agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId());
@@ -1801,25 +1768,22 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep
QPID_LOG(debug, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey);
// Send an Attach Response
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer (qmfV1BufferSize);
encodeHeader (outBuffer, 'a', sequence);
outBuffer.putLong (brokerBank);
outBuffer.putLong (assignedBank);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank <<
" to=" << replyToKey << " seq=" << sequence);
}
-void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
+void ManagementAgent::handleGetQuery(Buffer& inBuffer, const string& replyToKey, uint32_t sequence)
{
FieldTable ft;
FieldTable::ValuePtr value;
- moveNewObjectsLH();
+ moveNewObjects();
ft.decode(inBuffer);
@@ -1832,11 +1796,17 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe
return;
ObjectId selector(value->get<string>());
- ManagementObjectMap::iterator iter = numericFind(selector);
- if (iter != managementObjects.end()) {
- ManagementObject* object = iter->second;
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+
+ ManagementObject::shared_ptr object;
+ {
+ sys::Mutex::ScopedLock lock(objectLock);
+ ManagementObjectMap::iterator iter = numericFind(selector);
+ if (iter != managementObjects.end())
+ object = iter->second;
+ }
+
+ if (object) {
+ ResizableBuffer outBuffer (qmfV1BufferSize);
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
@@ -1849,89 +1819,80 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe
sBuf.clear();
object->writeStatistics(sBuf, true);
outBuffer.putRawData(sBuf);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
}
- sendCommandCompleteLH(replyToKey, sequence);
+ sendCommandComplete(replyToKey, sequence);
return;
}
string className (value->get<string>());
- std::list<ObjectId>matches;
+ std::list<ManagementObject::shared_ptr> matches;
if (className == "memory")
- qpid::sys::MemStat::loadMemInfo(memstat);
+ qpid::sys::MemStat::loadMemInfo(memstat.get());
if (className == "broker") {
uint64_t uptime = sys::Duration(startTime, sys::now());
- static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
}
// build up a set of all objects to be dumped
- for (ManagementObjectMap::iterator iter = managementObjects.begin();
- iter != managementObjects.end();
- iter++) {
- ManagementObject* object = iter->second;
- if (object->getClassName () == className) {
- matches.push_back(object->getObjectId());
+ {
+ sys::Mutex::ScopedLock lock(objectLock);
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject::shared_ptr object = iter->second;
+ if (object->getClassName () == className) {
+ matches.push_back(object);
+ }
}
}
- // send them (as sendBufferLH drops the userLock)
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ // send them
+ ResizableBuffer outBuffer (qmfV1BufferSize);
while (matches.size()) {
- ObjectId objId = matches.front();
- ManagementObjectMap::iterator oIter = managementObjects.find( objId );
- if (oIter != managementObjects.end()) {
- ManagementObject* object = oIter->second;
-
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
-
- if (!object->isDeleted()) {
- string sProps, sStats;
- object->writeProperties(sProps);
- object->writeStatistics(sStats, true);
-
- size_t len = 8 + sProps.length() + sStats.length(); // 8 == size of header in bytes.
- if (len > MA_BUFFER_SIZE) {
- QPID_LOG(error, "Object " << objId << " too large for output buffer - discarded!");
- } else {
- if (outBuffer.available() < len) { // not enough room in current buffer, send it.
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey); // drops lock
- QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
- continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted.
- }
- encodeHeader(outBuffer, 'g', sequence);
- outBuffer.putRawData(sProps);
- outBuffer.putRawData(sStats);
+ ManagementObject::shared_ptr object = matches.front();
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ if (!object->isDeleted()) {
+ string sProps, sStats;
+ object->writeProperties(sProps);
+ object->writeStatistics(sStats, true);
+
+ size_t len = 8 + sProps.length() + sStats.length(); // 8 == size of header in bytes.
+ if (len > qmfV1BufferSize) {
+ QPID_LOG(error, "Object " << object->getObjectId() << " too large for output buffer - discarded!");
+ } else {
+ if (outBuffer.available() < len) { // not enough room in current buffer, send it.
+ sendBuffer(outBuffer, dExchange, replyToKey);
+ QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
+ continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted.
}
+ encodeHeader(outBuffer, 'g', sequence);
+ outBuffer.putRawData(sProps);
+ outBuffer.putRawData(sStats);
}
}
matches.pop_front();
}
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- if (outLen) {
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, dExchange, replyToKey);
+ if (outBuffer.getPosition() > 0) {
+ sendBuffer(outBuffer, dExchange, replyToKey);
QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence);
}
- sendCommandCompleteLH(replyToKey, sequence);
+ sendCommandComplete(replyToKey, sequence);
}
-void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal)
+void ManagementAgent::handleGetQuery(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal)
{
- moveNewObjectsLH();
+ moveNewObjects();
Variant::Map inMap;
Variant::Map::const_iterator i;
@@ -1950,17 +1911,17 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
*/
i = inMap.find("_what");
if (i == inMap.end()) {
- sendExceptionLH(rte, rtk, cid, "_what element missing in Query");
+ sendException(rte, rtk, cid, "_what element missing in Query");
return;
}
if (i->second.getType() != qpid::types::VAR_STRING) {
- sendExceptionLH(rte, rtk, cid, "_what element is not a string");
+ sendException(rte, rtk, cid, "_what element is not a string");
return;
}
if (i->second.asString() != "OBJECT") {
- sendExceptionLH(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+ sendException(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported");
return;
}
@@ -1984,11 +1945,11 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
}
if (className == "memory")
- qpid::sys::MemStat::loadMemInfo(memstat);
+ qpid::sys::MemStat::loadMemInfo(memstat.get());
if (className == "broker") {
uint64_t uptime = sys::Duration(startTime, sys::now());
- static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
+ boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime);
}
/*
@@ -2000,10 +1961,14 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
Variant::List list_;
ObjectId objId(i->second.asMap());
- ManagementObjectMap::iterator iter = managementObjects.find(objId);
- if (iter != managementObjects.end()) {
- ManagementObject* object = iter->second;
-
+ ManagementObject::shared_ptr object;
+ {
+ sys::Mutex::ScopedLock lock (objectLock);
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter != managementObjects.end())
+ object = iter->second;
+ }
+ if (object) {
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
@@ -2027,7 +1992,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
string content;
ListCodec::encode(list_, content);
- sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+ sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk);
return;
}
@@ -2037,10 +2002,18 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
Variant::List _subList;
unsigned int objCount = 0;
- for (ManagementObjectMap::iterator iter = managementObjects.begin();
- iter != managementObjects.end();
+ ManagementObjectVector localManagementObjects;
+ {
+ sys::Mutex::ScopedLock objLock(objectLock);
+ std::transform(managementObjects.begin(), managementObjects.end(),
+ std::back_inserter(localManagementObjects),
+ boost::bind(&ManagementObjectMap::value_type::second, _1));
+ }
+
+ for (ManagementObjectVector::iterator iter = localManagementObjects.begin();
+ iter != localManagementObjects.end();
iter++) {
- ManagementObject* object = iter->second;
+ ManagementObject::shared_ptr object = *iter;
if (object->getClassName() == className &&
(packageName.empty() || object->getPackageName() == packageName)) {
@@ -2055,7 +2028,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
object->writeTimestamps(map_);
object->mapEncodeValues(values, true, true); // write both stats and properties
- iter->first.mapEncode(oidMap);
+ object->getObjectId().mapEncode(oidMap);
map_["_values"] = values;
map_["_object_id"] = oidMap;
@@ -2080,13 +2053,13 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
string content;
while (_list.size() > 1) {
ListCodec::encode(_list.front().asList(), content);
- sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+ sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
_list.pop_front();
QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
}
headers.erase("partial");
ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content);
- sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+ sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length());
return;
}
@@ -2094,12 +2067,12 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
// Unrecognized query - Send empty message to indicate CommandComplete
string content;
ListCodec::encode(Variant::List(), content);
- sendBufferLH(content, cid, headers, "amqp/list", rte, rtk);
+ sendBuffer(content, cid, headers, "amqp/list", rte, rtk);
QPID_LOG(debug, "SENT QueryResponse (empty) to=" << rte << "/" << rtk);
}
-void ManagementAgent::handleLocateRequestLH(const string&, const string& rte, const string& rtk, const string& cid)
+void ManagementAgent::handleLocateRequest(const string&, const string& rte, const string& rtk, const string& cid)
{
QPID_LOG(debug, "RCVD AgentLocateRequest");
@@ -2117,16 +2090,17 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& rte, co
string content;
MapCodec::encode(map, content);
- sendBufferLH(content, cid, headers, "amqp/map", rte, rtk);
+ sendBuffer(content, cid, headers, "amqp/map", rte, rtk);
clientWasAdded = true;
QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk);
}
-bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
+bool ManagementAgent::authorizeAgentMessage(Message& msg)
{
- Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE);
+ sys::Mutex::ScopedLock lock(userLock);
+ ResizableBuffer inBuffer (qmfV1BufferSize);
uint32_t sequence = 0;
bool methodReq = false;
bool mapMsg = false;
@@ -2140,7 +2114,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
// authorized or not. In this case, return true (authorized) if there is no ACL in place,
// otherwise return false;
//
- if (msg.getContentSize() > MA_BUFFER_SIZE)
+ if (msg.getContentSize() > qmfV1BufferSize)
return broker->getAcl() == 0;
inBuffer.putRawData(msg.getContent());
@@ -2193,11 +2167,11 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
}
// look up schema for object to get package and class name
-
+ sys::Mutex::ScopedLock lock(objectLock);
ManagementObjectMap::iterator iter = managementObjects.find(objId);
if (iter == managementObjects.end() || iter->second->isDeleted()) {
- QPID_LOG(debug, "ManagementAgent::authorizeAgentMessageLH: stale object id " <<
+ QPID_LOG(debug, "ManagementAgent::authorizeAgentMessage: stale object id " <<
objId);
return false;
}
@@ -2256,19 +2230,16 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
cid = p->getCorrelationId();
if (mapMsg) {
- sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
- Manageable::STATUS_FORBIDDEN, false);
+ sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN),
+ Manageable::STATUS_FORBIDDEN, false);
} else {
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer(qmfV1BufferSize);
encodeHeader(outBuffer, 'm', sequence);
outBuffer.putLong(Manageable::STATUS_FORBIDDEN);
outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN));
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- sendBufferLH(outBuffer, outLen, rte, rtk);
+ sendBuffer(outBuffer, rte, rtk);
}
QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence);
@@ -2280,7 +2251,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg)
return true;
}
-void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal)
+void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal)
{
string rte;
string rtk;
@@ -2295,10 +2266,10 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal)
else
return;
- Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE);
+ ResizableBuffer inBuffer(qmfV1BufferSize);
uint8_t opcode;
- if (msg.getContentSize() > MA_BUFFER_SIZE) {
+ if (msg.getContentSize() > qmfV1BufferSize) {
QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " <<
msg.getContentSize());
return;
@@ -2317,39 +2288,38 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal)
string body;
string cid;
inBuffer.getRawData(body, bufferLen);
+ {
+ if (p && p->hasCorrelationId()) {
+ cid = p->getCorrelationId();
+ }
- if (p && p->hasCorrelationId()) {
- cid = p->getCorrelationId();
+ if (opcode == "_method_request")
+ return handleMethodRequest(body, rte, rtk, cid, msg.getPublisher(), viaLocal);
+ else if (opcode == "_query_request")
+ return handleGetQuery(body, rte, rtk, cid, viaLocal);
+ else if (opcode == "_agent_locate_request")
+ return handleLocateRequest(body, rte, rtk, cid);
}
-
- if (opcode == "_method_request")
- return handleMethodRequestLH(body, rte, rtk, cid, msg.getPublisher(), viaLocal);
- else if (opcode == "_query_request")
- return handleGetQueryLH(body, rte, rtk, cid, viaLocal);
- else if (opcode == "_agent_locate_request")
- return handleLocateRequestLH(body, rte, rtk, cid);
-
QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!");
return;
}
// old preV2 binary messages
-
while (inBuffer.getPosition() < bufferLen) {
uint32_t sequence;
if (!checkHeader(inBuffer, &opcode, &sequence))
return;
- if (opcode == 'B') handleBrokerRequestLH (inBuffer, rtk, sequence);
- else if (opcode == 'P') handlePackageQueryLH (inBuffer, rtk, sequence);
- else if (opcode == 'p') handlePackageIndLH (inBuffer, rtk, sequence);
- else if (opcode == 'Q') handleClassQueryLH (inBuffer, rtk, sequence);
- else if (opcode == 'q') handleClassIndLH (inBuffer, rtk, sequence);
- else if (opcode == 'S') handleSchemaRequestLH (inBuffer, rte, rtk, sequence);
- else if (opcode == 's') handleSchemaResponseLH (inBuffer, rtk, sequence);
- else if (opcode == 'A') handleAttachRequestLH (inBuffer, rtk, sequence, msg.getPublisher());
- else if (opcode == 'G') handleGetQueryLH (inBuffer, rtk, sequence);
- else if (opcode == 'M') handleMethodRequestLH (inBuffer, rtk, sequence, msg.getPublisher());
+ if (opcode == 'B') handleBrokerRequest (inBuffer, rtk, sequence);
+ else if (opcode == 'P') handlePackageQuery (inBuffer, rtk, sequence);
+ else if (opcode == 'p') handlePackageInd (inBuffer, rtk, sequence);
+ else if (opcode == 'Q') handleClassQuery (inBuffer, rtk, sequence);
+ else if (opcode == 'q') handleClassInd (inBuffer, rtk, sequence);
+ else if (opcode == 'S') handleSchemaRequest (inBuffer, rte, rtk, sequence);
+ else if (opcode == 's') handleSchemaResponse (inBuffer, rtk, sequence);
+ else if (opcode == 'A') handleAttachRequest (inBuffer, rtk, sequence, msg.getPublisher());
+ else if (opcode == 'G') handleGetQuery (inBuffer, rtk, sequence);
+ else if (opcode == 'M') handleMethodRequest (inBuffer, rtk, sequence, msg.getPublisher());
}
}
@@ -2365,14 +2335,11 @@ ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string
QPID_LOG (debug, "ManagementAgent added package " << name);
// Publish a package-indication message
- Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE);
- uint32_t outLen;
+ ResizableBuffer outBuffer (qmfV1BufferSize);
encodeHeader (outBuffer, 'p');
encodePackageIndication (outBuffer, result.first);
- outLen = MA_BUFFER_SIZE - outBuffer.available ();
- outBuffer.reset ();
- sendBufferLH(outBuffer, outLen, mExchange, "schema.package");
+ sendBuffer(outBuffer, mExchange, "schema.package");
QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package");
return result.first;
@@ -2684,7 +2651,7 @@ void ManagementAgent::RemoteAgent::mapDecode(const Variant::Map& map_) {
connectionRef.mapDecode(i->second.asMap());
}
- mgmtObject = new _qmf::Agent(&agent, this);
+ mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent(&agent, this));
if ((i = map_.find("_values")) != map_.end()) {
mgmtObject->mapDecodeValues(i->second.asMap());
@@ -2827,8 +2794,8 @@ void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList)
sys::Mutex::ScopedLock lock (userLock);
- moveNewObjectsLH();
- moveDeletedObjectsLH();
+ moveNewObjects();
+ moveDeletedObjects();
// now copy the pending deletes into the outList
for (PendingDeletedObjsMap::iterator mIter = pendingDeletedObjs.begin();
@@ -2845,15 +2812,15 @@ void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList)
void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList)
{
sys::Mutex::ScopedLock lock (userLock);
+ sys::Mutex::ScopedLock objLock(objectLock);
// Clear out any existing deleted objects
- moveNewObjectsLH();
+ moveNewObjects();
pendingDeletedObjs.clear();
ManagementObjectMap::iterator i = managementObjects.begin();
// Silently drop any deleted objects left over from receiving the update.
while (i != managementObjects.end()) {
- ManagementObject* object = i->second;
+ ManagementObject::shared_ptr object = i->second;
if (object->isDeleted()) {
- delete object;
managementObjects.erase(i++);
}
else ++i;
@@ -2867,7 +2834,7 @@ void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList)
// construct a DeletedObject from a management object.
-ManagementAgent::DeletedObject::DeletedObject(ManagementObject *src, bool v1, bool v2)
+ManagementAgent::DeletedObject::DeletedObject(ManagementObject::shared_ptr src, bool v1, bool v2)
: packageName(src->getPackageName()),
className(src->getClassName())
{
@@ -2943,14 +2910,17 @@ void ManagementAgent::DeletedObject::encode(std::string& toBuffer)
}
// Remove Deleted objects, and save for later publishing...
-bool ManagementAgent::moveDeletedObjectsLH() {
- typedef vector<pair<ObjectId, ManagementObject*> > DeleteList;
+bool ManagementAgent::moveDeletedObjects() {
+ typedef vector<pair<ObjectId, ManagementObject::shared_ptr> > DeleteList;
+
+ sys::Mutex::ScopedLock lock (objectLock);
+
DeleteList deleteList;
for (ManagementObjectMap::iterator iter = managementObjects.begin();
iter != managementObjects.end();
++iter)
{
- ManagementObject* object = iter->second;
+ ManagementObject::shared_ptr object = iter->second;
if (object->isDeleted()) deleteList.push_back(*iter);
}
@@ -2959,13 +2929,12 @@ bool ManagementAgent::moveDeletedObjectsLH() {
iter != deleteList.rend();
iter++)
{
- ManagementObject* delObj = iter->second;
+ ManagementObject::shared_ptr delObj = iter->second;
assert(delObj->isDeleted());
DeletedObject::shared_ptr dptr(new DeletedObject(delObj, qmf1Support, qmf2Support));
pendingDeletedObjs[dptr->getKey()].push_back(dptr);
managementObjects.erase(iter->first);
- delete iter->second;
}
return !deleteList.empty();
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index c7e830dcf5..fba733a984 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -37,6 +37,7 @@
#include "qpid/types/Variant.h"
#include <qpid/framing/AMQFrame.h>
#include <qpid/framing/ResizableBuffer.h>
+#include <boost/shared_ptr.hpp>
#include <memory>
#include <string>
#include <map>
@@ -100,12 +101,12 @@ public:
const std::string& eventName,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
- QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
- uint64_t persistId = 0,
- bool persistent = false);
- QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
- const std::string& key,
- bool persistent = false);
+ QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object,
+ uint64_t persistId = 0,
+ bool persistent = false);
+ QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object,
+ const std::string& key,
+ bool persistent = false);
QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event,
severity_t severity = SEV_DEFAULT);
QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey);
@@ -158,7 +159,7 @@ public:
class DeletedObject {
public:
typedef boost::shared_ptr<DeletedObject> shared_ptr;
- DeletedObject(ManagementObject *, bool v1, bool v2);
+ DeletedObject(ManagementObject::shared_ptr, bool v1, bool v2);
DeletedObject( const std::string &encoded );
~DeletedObject() {};
void encode( std::string& toBuffer );
@@ -207,9 +208,9 @@ private:
uint32_t agentBank;
std::string routingKey;
ObjectId connectionRef;
- qmf::org::apache::qpid::broker::Agent* mgmtObject;
- RemoteAgent(ManagementAgent& _agent) : agent(_agent), mgmtObject(0) {}
- ManagementObject* GetManagementObject (void) const { return mgmtObject; }
+ qmf::org::apache::qpid::broker::Agent::shared_ptr mgmtObject;
+ RemoteAgent(ManagementAgent& _agent) : agent(_agent) {}
+ ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; }
virtual ~RemoteAgent ();
void mapEncode(qpid::types::Variant::Map& _map) const;
@@ -276,7 +277,7 @@ private:
PackageMap packages;
//
- // Protected by userLock
+ // Protected by objectLock
//
ManagementObjectMap managementObjects;
@@ -288,11 +289,11 @@ private:
framing::Uuid uuid;
//
- // Lock hierarchy: If a thread needs to take both addLock and userLock,
- // it MUST take userLock first, then addLock.
+ // Lock ordering: userLock -> addLock -> objectLock
//
sys::Mutex userLock;
sys::Mutex addLock;
+ sys::Mutex objectLock;
qpid::broker::Exchange::shared_ptr mExchange;
qpid::broker::Exchange::shared_ptr dExchange;
@@ -335,53 +336,45 @@ private:
// list of objects that have been deleted, but have yet to be published
// one final time.
// Indexed by a string composed of the object's package and class name.
- // Protected by userLock.
+ // Protected by objectLock.
typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap;
PendingDeletedObjsMap pendingDeletedObjs;
-# define MA_BUFFER_SIZE 65536
- char inputBuffer[MA_BUFFER_SIZE];
- char outputBuffer[MA_BUFFER_SIZE];
- char eventBuffer[MA_BUFFER_SIZE];
- framing::ResizableBuffer msgBuffer;
-
//
// Memory statistics object
//
- qmf::org::apache::qpid::broker::Memory *memstat;
+ qmf::org::apache::qpid::broker::Memory::shared_ptr memstat;
void writeData ();
void periodicProcessing (void);
- void deleteObjectNowLH(const ObjectId& oid);
+ void deleteObjectNow(const ObjectId& oid);
void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
- void sendBufferLH(framing::Buffer& buf,
- uint32_t length,
- qpid::broker::Exchange::shared_ptr exchange,
- const std::string& routingKey);
- void sendBufferLH(framing::Buffer& buf,
- uint32_t length,
- const std::string& exchange,
- const std::string& routingKey);
- void sendBufferLH(const std::string& data,
- const std::string& cid,
- const qpid::types::Variant::Map& headers,
- const std::string& content_type,
- qpid::broker::Exchange::shared_ptr exchange,
- const std::string& routingKey,
- uint64_t ttl_msec = 0);
- void sendBufferLH(const std::string& data,
- const std::string& cid,
- const qpid::types::Variant::Map& headers,
- const std::string& content_type,
- const std::string& exchange,
- const std::string& routingKey,
- uint64_t ttl_msec = 0);
- void moveNewObjectsLH();
- bool moveDeletedObjectsLH();
-
- bool authorizeAgentMessageLH(qpid::broker::Message& msg);
- void dispatchAgentCommandLH(qpid::broker::Message& msg, bool viaLocal=false);
+ void sendBuffer(framing::Buffer& buf,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const std::string& routingKey);
+ void sendBuffer(framing::Buffer& buf,
+ const std::string& exchange,
+ const std::string& routingKey);
+ void sendBuffer(const std::string& data,
+ const std::string& cid,
+ const qpid::types::Variant::Map& headers,
+ const std::string& content_type,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const std::string& routingKey,
+ uint64_t ttl_msec = 0);
+ void sendBuffer(const std::string& data,
+ const std::string& cid,
+ const qpid::types::Variant::Map& headers,
+ const std::string& content_type,
+ const std::string& exchange,
+ const std::string& routingKey,
+ uint64_t ttl_msec = 0);
+ void moveNewObjects();
+ bool moveDeletedObjects();
+
+ bool authorizeAgentMessage(qpid::broker::Message& msg);
+ void dispatchAgentCommand(qpid::broker::Message& msg, bool viaLocal=false);
PackageMap::iterator findOrAddPackageLH(std::string name);
void addClassLH(uint8_t kind,
@@ -399,22 +392,22 @@ private:
uint32_t allocateNewBank ();
uint32_t assignBankLH (uint32_t requestedPrefix);
void deleteOrphanedAgentsLH();
- void sendCommandCompleteLH(const std::string& replyToKey, uint32_t sequence,
- uint32_t code = 0, const std::string& text = "OK");
- void sendExceptionLH(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
- void handleBrokerRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handlePackageQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handlePackageIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleClassQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleClassIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleSchemaRequestLH (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence);
- void handleSchemaResponseLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleAttachRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
- void handleGetQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleMethodRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
- void handleGetQueryLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal);
- void handleMethodRequestLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal);
- void handleLocateRequestLH (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid);
+ void sendCommandComplete(const std::string& replyToKey, uint32_t sequence,
+ uint32_t code = 0, const std::string& text = "OK");
+ void sendException(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
+ void handleBrokerRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handlePackageQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handlePackageInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleClassQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleClassInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleSchemaRequest (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence);
+ void handleSchemaResponse (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleAttachRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
+ void handleGetQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleMethodRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
+ void handleGetQuery (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal);
+ void handleMethodRequest (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal);
+ void handleLocateRequest (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid);
size_t validateSchema(framing::Buffer&, uint8_t kind);
diff --git a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
index 02aa87f876..9c21e51a18 100644
--- a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
+++ b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
@@ -41,751 +41,700 @@ using namespace qpid::types;
namespace qpid {
- namespace tests {
-
- namespace _qmf = qmf::org::apache::qpid::broker::mgmt::test;
- namespace {
-
- typedef boost::shared_ptr<_qmf::TestObject> TestObjectPtr;
- typedef std::vector<TestObjectPtr> TestObjectVector;
-
- // Instantiates a broker and its internal management agent. Provides
- // factories for constructing Receivers for object indication messages.
- //
- class AgentFixture
- {
- MessagingFixture *mFix;
-
- public:
- AgentFixture( unsigned int pubInterval=10,
- bool qmfV2=false,
- qpid::broker::Broker::Options opts = qpid::broker::Broker::Options())
- {
- opts.enableMgmt=true;
- opts.qmf2Support=qmfV2;
- opts.mgmtPubInterval=pubInterval;
- mFix = new MessagingFixture(opts, true);
-
- _qmf::TestObject::registerSelf(getBrokerAgent());
- };
- ~AgentFixture()
- {
- delete mFix;
- };
- ::qpid::management::ManagementAgent *getBrokerAgent() { return mFix->broker->getManagementAgent(); }
- Receiver createV1DataIndRcvr( const std::string package, const std::string klass )
- {
- return mFix->session.createReceiver(std::string("kqueue; {create: always, delete: always, "
- "node: {type: queue, "
- "x-bindings: [{exchange: qpid.management, "
- "key: 'console.obj.1.0.")
- + package + std::string(".") + klass
- + std::string("'}]}}"));
- };
- Receiver createV2DataIndRcvr( const std::string package, const std::string klass )
- {
- std::string p(package);
- std::replace(p.begin(), p.end(), '.', '_');
- std::string k(klass);
- std::replace(k.begin(), k.end(), '.', '_');
-
- return mFix->session.createReceiver(std::string("kqueue; {create: always, delete: always, "
- "node: {type: queue, "
- "x-bindings: [{exchange: qmf.default.topic, "
- "key: 'agent.ind.data.")
- + p + std::string(".") + k
- + std::string("'}]}}"));
- };
- };
-
-
- // A "management object" that supports the TestObject
- //
- class TestManageable : public qpid::management::Manageable
- {
- management::ManagementObject* mgmtObj;
- const std::string key;
- public:
- TestManageable(management::ManagementAgent *agent, std::string _key)
- : key(_key)
- {
- _qmf::TestObject *tmp = new _qmf::TestObject(agent, this);
-
- // seed it with some default values...
- tmp->set_string1(key);
- tmp->set_bool1(true);
- qpid::types::Variant::Map vMap;
- vMap["one"] = qpid::types::Variant(1);
- vMap["two"] = qpid::types::Variant("two");
- vMap["three"] = qpid::types::Variant("whatever");
- tmp->set_map1(vMap);
-
- mgmtObj = tmp;
- };
- ~TestManageable() { mgmtObj = 0; /* deleted by agent on shutdown */ };
- management::ManagementObject* GetManagementObject() const { return mgmtObj; };
- static void validateTestObjectProperties(_qmf::TestObject& to)
- {
- // verify the default values are as expected. We don't check 'string1',
- // as it is the object key, and is unique for each object (no default value).
- BOOST_CHECK(to.get_bool1() == true);
- BOOST_CHECK(to.get_map1().size() == 3);
- qpid::types::Variant::Map mappy = to.get_map1();
- BOOST_CHECK(1 == (unsigned int)mappy["one"]);
- BOOST_CHECK(mappy["two"].asString() == std::string("two"));
- BOOST_CHECK(mappy["three"].asString() == std::string("whatever"));
- };
- };
-
-
- // decode a V1 Content Indication message
- //
- void decodeV1ObjectUpdates(const Message& inMsg, TestObjectVector& objs, const size_t objLen)
- {
- const size_t MAX_BUFFER_SIZE=65536;
- char tmp[MAX_BUFFER_SIZE];
-
- objs.clear();
-
- BOOST_CHECK(inMsg.getContent().size() <= MAX_BUFFER_SIZE);
-
- ::memcpy(tmp, inMsg.getContent().data(), inMsg.getContent().size());
- Buffer buf(tmp, inMsg.getContent().size());
-
- while (buf.available() > 8) { // 8 == qmf v1 header size
- BOOST_CHECK_EQUAL(buf.getOctet(), 'A');
- BOOST_CHECK_EQUAL(buf.getOctet(), 'M');
- BOOST_CHECK_EQUAL(buf.getOctet(), '2');
- BOOST_CHECK_EQUAL(buf.getOctet(), 'c'); // opcode == content indication
- // @@todo: kag: how do we skip 'i' entries???
- buf.getLong(); // ignore sequence
-
- std::string str1; // decode content body as string
- buf.getRawData(str1, objLen);
-
- TestObjectPtr fake(new _qmf::TestObject(0,0));
- fake->readProperties( str1 );
- objs.push_back(fake);
- }
- }
-
-
- // decode a V2 Content Indication message
- //
- void decodeV2ObjectUpdates(const qpid::messaging::Message& inMsg, TestObjectVector& objs)
- {
- objs.clear();
+namespace tests {
+
+namespace _qmf = qmf::org::apache::qpid::broker::mgmt::test;
+namespace {
+
+typedef boost::shared_ptr<_qmf::TestObject> TestObjectPtr;
+typedef std::vector<TestObjectPtr> TestObjectVector;
+
+// Instantiates a broker and its internal management agent. Provides
+// factories for constructing Receivers for object indication messages.
+//
+class AgentFixture
+{
+ MessagingFixture *mFix;
+
+ public:
+ AgentFixture( unsigned int pubInterval=10,
+ bool qmfV2=false,
+ qpid::broker::Broker::Options opts = qpid::broker::Broker::Options())
+ {
+ opts.enableMgmt=true;
+ opts.qmf2Support=qmfV2;
+ opts.mgmtPubInterval=pubInterval;
+ mFix = new MessagingFixture(opts, true);
+
+ _qmf::TestObject::registerSelf(getBrokerAgent());
+ };
+ ~AgentFixture()
+ {
+ delete mFix;
+ };
+ ::qpid::management::ManagementAgent *getBrokerAgent() { return mFix->broker->getManagementAgent(); }
+ Receiver createV1DataIndRcvr( const std::string package, const std::string klass )
+ {
+ return mFix->session.createReceiver(std::string("kqueue; {create: always, delete: always, "
+ "node: {type: queue, "
+ "x-bindings: [{exchange: qpid.management, "
+ "key: 'console.obj.1.0.")
+ + package + std::string(".") + klass
+ + std::string("'}]}}"));
+ };
+ Receiver createV2DataIndRcvr( const std::string package, const std::string klass )
+ {
+ std::string p(package);
+ std::replace(p.begin(), p.end(), '.', '_');
+ std::string k(klass);
+ std::replace(k.begin(), k.end(), '.', '_');
+
+ return mFix->session.createReceiver(std::string("kqueue; {create: always, delete: always, "
+ "node: {type: queue, "
+ "x-bindings: [{exchange: qmf.default.topic, "
+ "key: 'agent.ind.data.")
+ + p + std::string(".") + k
+ + std::string("'}]}}"));
+ };
+};
+
+
+// A "management object" that supports the TestObject
+//
+class TestManageable : public qpid::management::Manageable
+{
+ management::ManagementObject::shared_ptr mgmtObj;
+ const std::string key;
+ public:
+ TestManageable(management::ManagementAgent *agent, std::string _key)
+ : key(_key)
+ {
+ _qmf::TestObject::shared_ptr tmp(new _qmf::TestObject(agent, this));
+
+ // seed it with some default values...
+ tmp->set_string1(key);
+ tmp->set_bool1(true);
+ qpid::types::Variant::Map vMap;
+ vMap["one"] = qpid::types::Variant(1);
+ vMap["two"] = qpid::types::Variant("two");
+ vMap["three"] = qpid::types::Variant("whatever");
+ tmp->set_map1(vMap);
+
+ mgmtObj = tmp;
+ };
+ ~TestManageable() { mgmtObj.reset(); }
+ management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObj; };
+ static void validateTestObjectProperties(_qmf::TestObject& to)
+ {
+ // verify the default values are as expected. We don't check 'string1',
+ // as it is the object key, and is unique for each object (no default value).
+ BOOST_CHECK(to.get_bool1() == true);
+ BOOST_CHECK(to.get_map1().size() == 3);
+ qpid::types::Variant::Map mappy = to.get_map1();
+ BOOST_CHECK(1 == (unsigned int)mappy["one"]);
+ BOOST_CHECK(mappy["two"].asString() == std::string("two"));
+ BOOST_CHECK(mappy["three"].asString() == std::string("whatever"));
+ };
+};
+
+
+// decode a V1 Content Indication message
+//
+void decodeV1ObjectUpdates(const Message& inMsg, TestObjectVector& objs, const size_t objLen)
+{
+ const size_t MAX_BUFFER_SIZE=65536;
+ char tmp[MAX_BUFFER_SIZE];
+
+ objs.clear();
+
+ BOOST_CHECK(inMsg.getContent().size() <= MAX_BUFFER_SIZE);
+
+ ::memcpy(tmp, inMsg.getContent().data(), inMsg.getContent().size());
+ Buffer buf(tmp, inMsg.getContent().size());
+
+ while (buf.available() > 8) { // 8 == qmf v1 header size
+ BOOST_CHECK_EQUAL(buf.getOctet(), 'A');
+ BOOST_CHECK_EQUAL(buf.getOctet(), 'M');
+ BOOST_CHECK_EQUAL(buf.getOctet(), '2');
+ BOOST_CHECK_EQUAL(buf.getOctet(), 'c'); // opcode == content indication
+ // @@todo: kag: how do we skip 'i' entries???
+ buf.getLong(); // ignore sequence
+
+ std::string str1; // decode content body as string
+ buf.getRawData(str1, objLen);
+
+ TestObjectPtr fake(new _qmf::TestObject(0,0));
+ fake->readProperties( str1 );
+ objs.push_back(fake);
+ }
+}
- BOOST_CHECK_EQUAL(inMsg.getContentType(), std::string("amqp/list"));
- const ::qpid::types::Variant::Map& m = inMsg.getProperties();
- Variant::Map::const_iterator iter = m.find(std::string("qmf.opcode"));
- BOOST_CHECK(iter != m.end());
- BOOST_CHECK_EQUAL(iter->second.asString(), std::string("_data_indication"));
+// decode a V2 Content Indication message
+//
+void decodeV2ObjectUpdates(const qpid::messaging::Message& inMsg, TestObjectVector& objs)
+{
+ objs.clear();
- Variant::List vList;
- ::qpid::amqp_0_10::ListCodec::decode(inMsg.getContent(), vList);
+ BOOST_CHECK_EQUAL(inMsg.getContentType(), std::string("amqp/list"));
- for (Variant::List::iterator lIter = vList.begin(); lIter != vList.end(); lIter++) {
- TestObjectPtr fake(new _qmf::TestObject(0,0));
- fake->readTimestamps(lIter->asMap());
- fake->mapDecodeValues((lIter->asMap())["_values"].asMap());
- objs.push_back(fake);
- }
- }
- }
+ const ::qpid::types::Variant::Map& m = inMsg.getProperties();
+ Variant::Map::const_iterator iter = m.find(std::string("qmf.opcode"));
+ BOOST_CHECK(iter != m.end());
+ BOOST_CHECK_EQUAL(iter->second.asString(), std::string("_data_indication"));
- QPID_AUTO_TEST_SUITE(BrokerMgmtAgent)
+ Variant::List vList;
+ ::qpid::amqp_0_10::ListCodec::decode(inMsg.getContent(), vList);
- // verify that an object that is added to the broker's management database is
- // published correctly. Furthermore, verify that it is published once after
- // it has been deleted.
- //
- QPID_AUTO_TEST_CASE(v1ObjPublish)
- {
- AgentFixture* fix = new AgentFixture(3);
- management::ManagementAgent* agent;
- agent = fix->getBrokerAgent();
+ for (Variant::List::iterator lIter = vList.begin(); lIter != vList.end(); lIter++) {
+ TestObjectPtr fake(new _qmf::TestObject(0,0));
+ fake->readTimestamps(lIter->asMap());
+ fake->mapDecodeValues((lIter->asMap())["_values"].asMap());
+ objs.push_back(fake);
+ }
+}
+}
- // create a manageable test object
- TestManageable *tm = new TestManageable(agent, std::string("obj1"));
- uint32_t objLen = tm->GetManagementObject()->writePropertiesSize();
+QPID_AUTO_TEST_SUITE(BrokerMgmtAgent)
- Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
+// verify that an object that is added to the broker's management database is
+// published correctly. Furthermore, verify that it is published once after
+// it has been deleted.
+//
+QPID_AUTO_TEST_CASE(v1ObjPublish)
+{
+ AgentFixture* fix = new AgentFixture(3);
+ management::ManagementAgent* agent;
+ agent = fix->getBrokerAgent();
- agent->addObject(tm->GetManagementObject(), 1);
+ // create a manageable test object
+ TestManageable *tm = new TestManageable(agent, std::string("obj1"));
+ uint32_t objLen = tm->GetManagementObject()->writePropertiesSize();
- // wait for the object to be published
- Message m1;
- BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6));
+ Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
- TestObjectVector objs;
- decodeV1ObjectUpdates(m1, objs, objLen);
- BOOST_CHECK(objs.size() > 0);
+ agent->addObject(tm->GetManagementObject(), 1);
- for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
+ // wait for the object to be published
+ Message m1;
+ BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6));
- TestManageable::validateTestObjectProperties(**oIter);
+ TestObjectVector objs;
+ decodeV1ObjectUpdates(m1, objs, objLen);
+ BOOST_CHECK(objs.size() > 0);
- qpid::types::Variant::Map mappy;
- (*oIter)->writeTimestamps(mappy);
- BOOST_CHECK(0 == mappy["_delete_ts"].asUint64()); // not deleted
- }
+ for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
- // destroy the object
+ TestManageable::validateTestObjectProperties(**oIter);
- tm->GetManagementObject()->resourceDestroy();
+ qpid::types::Variant::Map mappy;
+ (*oIter)->writeTimestamps(mappy);
+ BOOST_CHECK(0 == mappy["_delete_ts"].asUint64()); // not deleted
+ }
- // wait for the deleted object to be published
+ // destroy the object
- bool isDeleted = false;
- while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) {
+ tm->GetManagementObject()->resourceDestroy();
- decodeV1ObjectUpdates(m1, objs, objLen);
- BOOST_CHECK(objs.size() > 0);
+ // wait for the deleted object to be published
- for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
+ bool isDeleted = false;
+ while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) {
- TestManageable::validateTestObjectProperties(**oIter);
+ decodeV1ObjectUpdates(m1, objs, objLen);
+ BOOST_CHECK(objs.size() > 0);
- qpid::types::Variant::Map mappy;
- (*oIter)->writeTimestamps(mappy);
- if (mappy["_delete_ts"].asUint64() != 0)
- isDeleted = true;
- }
- }
+ for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
- BOOST_CHECK(isDeleted);
+ TestManageable::validateTestObjectProperties(**oIter);
- r1.close();
- delete fix;
- delete tm;
+ qpid::types::Variant::Map mappy;
+ (*oIter)->writeTimestamps(mappy);
+ if (mappy["_delete_ts"].asUint64() != 0)
+ isDeleted = true;
}
+ }
- // Repeat the previous test, but with V2-based object support
- //
- QPID_AUTO_TEST_CASE(v2ObjPublish)
- {
- AgentFixture* fix = new AgentFixture(3, true);
- management::ManagementAgent* agent;
- agent = fix->getBrokerAgent();
+ BOOST_CHECK(isDeleted);
- TestManageable *tm = new TestManageable(agent, std::string("obj2"));
+ r1.close();
+ delete fix;
+ delete tm;
+}
- Receiver r1 = fix->createV2DataIndRcvr(tm->GetManagementObject()->getPackageName(), "#");
+// Repeat the previous test, but with V2-based object support
+//
+QPID_AUTO_TEST_CASE(v2ObjPublish)
+{
+ AgentFixture* fix = new AgentFixture(3, true);
+ management::ManagementAgent* agent;
+ agent = fix->getBrokerAgent();
- agent->addObject(tm->GetManagementObject(), "testobj-1");
+ TestManageable *tm = new TestManageable(agent, std::string("obj2"));
- // wait for the object to be published
- Message m1;
- BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6));
+ Receiver r1 = fix->createV2DataIndRcvr(tm->GetManagementObject()->getPackageName(), "#");
- TestObjectVector objs;
- decodeV2ObjectUpdates(m1, objs);
- BOOST_CHECK(objs.size() > 0);
+ agent->addObject(tm->GetManagementObject(), "testobj-1");
- for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
+ // wait for the object to be published
+ Message m1;
+ BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6));
- TestManageable::validateTestObjectProperties(**oIter);
+ TestObjectVector objs;
+ decodeV2ObjectUpdates(m1, objs);
+ BOOST_CHECK(objs.size() > 0);
- qpid::types::Variant::Map mappy;
- (*oIter)->writeTimestamps(mappy);
- BOOST_CHECK(0 == mappy["_delete_ts"].asUint64());
- }
+ for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
- // destroy the object
+ TestManageable::validateTestObjectProperties(**oIter);
- tm->GetManagementObject()->resourceDestroy();
+ qpid::types::Variant::Map mappy;
+ (*oIter)->writeTimestamps(mappy);
+ BOOST_CHECK(0 == mappy["_delete_ts"].asUint64());
+ }
- // wait for the deleted object to be published
+ // destroy the object
- bool isDeleted = false;
- while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) {
+ tm->GetManagementObject()->resourceDestroy();
- decodeV2ObjectUpdates(m1, objs);
- BOOST_CHECK(objs.size() > 0);
+ // wait for the deleted object to be published
- for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
+ bool isDeleted = false;
+ while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) {
- TestManageable::validateTestObjectProperties(**oIter);
+ decodeV2ObjectUpdates(m1, objs);
+ BOOST_CHECK(objs.size() > 0);
- qpid::types::Variant::Map mappy;
- (*oIter)->writeTimestamps(mappy);
- if (mappy["_delete_ts"].asUint64() != 0)
- isDeleted = true;
- }
- }
+ for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
- BOOST_CHECK(isDeleted);
+ TestManageable::validateTestObjectProperties(**oIter);
- r1.close();
- delete fix;
- delete tm;
+ qpid::types::Variant::Map mappy;
+ (*oIter)->writeTimestamps(mappy);
+ if (mappy["_delete_ts"].asUint64() != 0)
+ isDeleted = true;
}
+ }
+ BOOST_CHECK(isDeleted);
- // verify that a deleted object is exported correctly using the
- // exportDeletedObjects() method. V1 testcase.
- //
- QPID_AUTO_TEST_CASE(v1ExportDelObj)
- {
- AgentFixture* fix = new AgentFixture(3);
- management::ManagementAgent* agent;
- agent = fix->getBrokerAgent();
-
- // create a manageable test object
- TestManageable *tm = new TestManageable(agent, std::string("myObj"));
- uint32_t objLen = tm->GetManagementObject()->writePropertiesSize();
-
- Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
+ r1.close();
+ delete fix;
+ delete tm;
+}
- agent->addObject(tm->GetManagementObject(), 1);
- // wait for the object to be published
- Message m1;
- BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6));
+// verify that a deleted object is exported correctly using the
+// exportDeletedObjects() method. V1 testcase.
+//
+QPID_AUTO_TEST_CASE(v1ExportDelObj)
+{
+ AgentFixture* fix = new AgentFixture(3);
+ management::ManagementAgent* agent;
+ agent = fix->getBrokerAgent();
- TestObjectVector objs;
- decodeV1ObjectUpdates(m1, objs, objLen);
- BOOST_CHECK(objs.size() > 0);
+ // create a manageable test object
+ TestManageable *tm = new TestManageable(agent, std::string("myObj"));
+ uint32_t objLen = tm->GetManagementObject()->writePropertiesSize();
- // destroy the object, then immediately export (before the next poll cycle)
+ Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
- ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
- tm->GetManagementObject()->resourceDestroy();
- agent->exportDeletedObjects( delObjs );
- BOOST_CHECK(delObjs.size() == 1);
+ agent->addObject(tm->GetManagementObject(), 1);
- // wait for the deleted object to be published
+ // wait for the object to be published
+ Message m1;
+ BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6));
- bool isDeleted = false;
- while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) {
+ TestObjectVector objs;
+ decodeV1ObjectUpdates(m1, objs, objLen);
+ BOOST_CHECK(objs.size() > 0);
- decodeV1ObjectUpdates(m1, objs, objLen);
- BOOST_CHECK(objs.size() > 0);
+ // destroy the object, then immediately export (before the next poll cycle)
- for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
+ ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
+ tm->GetManagementObject()->resourceDestroy();
+ agent->exportDeletedObjects( delObjs );
+ BOOST_CHECK(delObjs.size() == 1);
- TestManageable::validateTestObjectProperties(**oIter);
+ // wait for the deleted object to be published
- qpid::types::Variant::Map mappy;
- (*oIter)->writeTimestamps(mappy);
- if (mappy["_delete_ts"].asUint64() != 0)
- isDeleted = true;
- }
- }
+ bool isDeleted = false;
+ while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) {
- BOOST_CHECK(isDeleted);
+ decodeV1ObjectUpdates(m1, objs, objLen);
+ BOOST_CHECK(objs.size() > 0);
- // verify there are no deleted objects to export now.
+ for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
- agent->exportDeletedObjects( delObjs );
- BOOST_CHECK(delObjs.size() == 0);
+ TestManageable::validateTestObjectProperties(**oIter);
- r1.close();
- delete fix;
- delete tm;
+ qpid::types::Variant::Map mappy;
+ (*oIter)->writeTimestamps(mappy);
+ if (mappy["_delete_ts"].asUint64() != 0)
+ isDeleted = true;
}
+ }
+ BOOST_CHECK(isDeleted);
- // verify that a deleted object is imported correctly using the
- // importDeletedObjects() method. V1 testcase.
- //
- QPID_AUTO_TEST_CASE(v1ImportDelObj)
- {
- AgentFixture* fix = new AgentFixture(3);
- management::ManagementAgent* agent;
- agent = fix->getBrokerAgent();
+ // verify there are no deleted objects to export now.
- // create a manageable test object
- TestManageable *tm = new TestManageable(agent, std::string("anObj"));
- uint32_t objLen = tm->GetManagementObject()->writePropertiesSize();
+ agent->exportDeletedObjects( delObjs );
+ BOOST_CHECK(delObjs.size() == 0);
- Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
+ r1.close();
+ delete fix;
+ delete tm;
+}
- agent->addObject(tm->GetManagementObject(), 1);
- // wait for the object to be published
- Message m1;
- BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6));
+// verify that a deleted object is imported correctly using the
+// importDeletedObjects() method. V1 testcase.
+//
+QPID_AUTO_TEST_CASE(v1ImportDelObj)
+{
+ AgentFixture* fix = new AgentFixture(3);
+ management::ManagementAgent* agent;
+ agent = fix->getBrokerAgent();
- TestObjectVector objs;
- decodeV1ObjectUpdates(m1, objs, objLen);
- BOOST_CHECK(objs.size() > 0);
+ // create a manageable test object
+ TestManageable *tm = new TestManageable(agent, std::string("anObj"));
+ uint32_t objLen = tm->GetManagementObject()->writePropertiesSize();
- // destroy the object, then immediately export (before the next poll cycle)
+ Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
- ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
- tm->GetManagementObject()->resourceDestroy();
- agent->exportDeletedObjects( delObjs );
- BOOST_CHECK(delObjs.size() == 1);
+ agent->addObject(tm->GetManagementObject(), 1);
- // destroy the broker, and reinistantiate a new one without populating it
- // with a TestObject.
+ // wait for the object to be published
+ Message m1;
+ BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6));
- r1.close();
- delete fix;
- delete tm; // should no longer be necessary
+ TestObjectVector objs;
+ decodeV1ObjectUpdates(m1, objs, objLen);
+ BOOST_CHECK(objs.size() > 0);
- fix = new AgentFixture(3);
- r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
- agent = fix->getBrokerAgent();
- agent->importDeletedObjects( delObjs );
+ // destroy the object, then immediately export (before the next poll cycle)
- // wait for the deleted object to be published
+ ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
+ tm->GetManagementObject()->resourceDestroy();
+ agent->exportDeletedObjects( delObjs );
+ BOOST_CHECK(delObjs.size() == 1);
- bool isDeleted = false;
- while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) {
+ // destroy the broker, and reinistantiate a new one without populating it
+ // with a TestObject.
- decodeV1ObjectUpdates(m1, objs, objLen);
- BOOST_CHECK(objs.size() > 0);
+ r1.close();
+ delete fix;
+ delete tm; // should no longer be necessary
- for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
+ fix = new AgentFixture(3);
+ r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
+ agent = fix->getBrokerAgent();
+ agent->importDeletedObjects( delObjs );
- TestManageable::validateTestObjectProperties(**oIter);
+ // wait for the deleted object to be published
- qpid::types::Variant::Map mappy;
- (*oIter)->writeTimestamps(mappy);
- if (mappy["_delete_ts"].asUint64() != 0)
- isDeleted = true;
- }
- }
+ bool isDeleted = false;
+ while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) {
- BOOST_CHECK(isDeleted);
+ decodeV1ObjectUpdates(m1, objs, objLen);
+ BOOST_CHECK(objs.size() > 0);
- // verify there are no deleted objects to export now.
+ for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
- agent->exportDeletedObjects( delObjs );
- BOOST_CHECK(delObjs.size() == 0);
+ TestManageable::validateTestObjectProperties(**oIter);
- r1.close();
- delete fix;
+ qpid::types::Variant::Map mappy;
+ (*oIter)->writeTimestamps(mappy);
+ if (mappy["_delete_ts"].asUint64() != 0)
+ isDeleted = true;
}
+ }
+ BOOST_CHECK(isDeleted);
- // verify that an object that is added and deleted prior to the
- // first poll cycle is accounted for by the export
- //
- QPID_AUTO_TEST_CASE(v1ExportFastDelObj)
- {
- AgentFixture* fix = new AgentFixture(3);
- management::ManagementAgent* agent;
- agent = fix->getBrokerAgent();
+ // verify there are no deleted objects to export now.
- // create a manageable test object
- TestManageable *tm = new TestManageable(agent, std::string("objectifyMe"));
+ agent->exportDeletedObjects( delObjs );
+ BOOST_CHECK(delObjs.size() == 0);
- // add, then immediately delete and export the object...
+ r1.close();
+ delete fix;
+}
- ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
- agent->addObject(tm->GetManagementObject(), 999);
- tm->GetManagementObject()->resourceDestroy();
- agent->exportDeletedObjects( delObjs );
- BOOST_CHECK(delObjs.size() == 1);
- delete fix;
- delete tm;
- }
+// verify that an object that is added and deleted prior to the
+// first poll cycle is accounted for by the export
+//
+QPID_AUTO_TEST_CASE(v1ExportFastDelObj)
+{
+ AgentFixture* fix = new AgentFixture(3);
+ management::ManagementAgent* agent;
+ agent = fix->getBrokerAgent();
+ // create a manageable test object
+ TestManageable *tm = new TestManageable(agent, std::string("objectifyMe"));
- // Verify that we can export and import multiple deleted objects correctly.
- //
- QPID_AUTO_TEST_CASE(v1ImportMultiDelObj)
- {
- AgentFixture* fix = new AgentFixture(3);
- management::ManagementAgent* agent;
- agent = fix->getBrokerAgent();
-
- Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
-
- // populate the agent with multiple test objects
- const size_t objCount = 50;
- std::vector<TestManageable *> tmv;
- uint32_t objLen;
-
- for (size_t i = 0; i < objCount; i++) {
- std::stringstream key;
- key << "testobj-" << std::setfill('x') << std::setw(4) << i;
- // (no, seriously, I didn't just do that.)
- // Note well: we have to keep the key string length EXACTLY THE SAME
- // FOR ALL OBJECTS, so objLen will be the same. Otherwise the
- // decodeV1ObjectUpdates() will fail (v1 lacks explict encoded length).
- TestManageable *tm = new TestManageable(agent, key.str());
- objLen = tm->GetManagementObject()->writePropertiesSize();
- agent->addObject(tm->GetManagementObject(), i + 1);
- tmv.push_back(tm);
- }
+ // add, then immediately delete and export the object...
- // wait for the objects to be published
- Message m1;
- uint32_t msgCount = 0;
- while(r1.fetch(m1, Duration::SECOND * 6)) {
- TestObjectVector objs;
- decodeV1ObjectUpdates(m1, objs, objLen);
- msgCount += objs.size();
- }
+ ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
+ agent->addObject(tm->GetManagementObject(), 999);
+ tm->GetManagementObject()->resourceDestroy();
+ agent->exportDeletedObjects( delObjs );
+ BOOST_CHECK(delObjs.size() == 1);
- BOOST_CHECK_EQUAL(msgCount, objCount);
+ delete fix;
+ delete tm;
+}
- // destroy some of the objects, then immediately export (before the next poll cycle)
- uint32_t delCount = 0;
- for (size_t i = 0; i < objCount; i += 2) {
- tmv[i]->GetManagementObject()->resourceDestroy();
- delCount++;
- }
+// Verify that we can export and import multiple deleted objects correctly.
+//
+QPID_AUTO_TEST_CASE(v1ImportMultiDelObj)
+{
+ AgentFixture* fix = new AgentFixture(3);
+ management::ManagementAgent* agent;
+ agent = fix->getBrokerAgent();
+
+ Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
+
+ // populate the agent with multiple test objects
+ const size_t objCount = 50;
+ std::vector<TestManageable *> tmv;
+ uint32_t objLen;
+
+ for (size_t i = 0; i < objCount; i++) {
+ std::stringstream key;
+ key << "testobj-" << std::setfill('x') << std::setw(4) << i;
+ // (no, seriously, I didn't just do that.)
+ // Note well: we have to keep the key string length EXACTLY THE SAME
+ // FOR ALL OBJECTS, so objLen will be the same. Otherwise the
+ // decodeV1ObjectUpdates() will fail (v1 lacks explict encoded length).
+ TestManageable *tm = new TestManageable(agent, key.str());
+ objLen = tm->GetManagementObject()->writePropertiesSize();
+ agent->addObject(tm->GetManagementObject(), i + 1);
+ tmv.push_back(tm);
+ }
- ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
- agent->exportDeletedObjects( delObjs );
- BOOST_CHECK_EQUAL(delObjs.size(), delCount);
+ // wait for the objects to be published
+ Message m1;
+ uint32_t msgCount = 0;
+ while(r1.fetch(m1, Duration::SECOND * 6)) {
+ TestObjectVector objs;
+ decodeV1ObjectUpdates(m1, objs, objLen);
+ msgCount += objs.size();
+ }
- // destroy the broker, and reinistantiate a new one without populating it
- // with TestObjects.
+ BOOST_CHECK_EQUAL(msgCount, objCount);
- r1.close();
- delete fix;
- while (tmv.size()) {
- delete tmv.back();
- tmv.pop_back();
- }
+ // destroy some of the objects, then immediately export (before the next poll cycle)
- fix = new AgentFixture(3);
- r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
- agent = fix->getBrokerAgent();
- agent->importDeletedObjects( delObjs );
+ uint32_t delCount = 0;
+ for (size_t i = 0; i < objCount; i += 2) {
+ tmv[i]->GetManagementObject()->resourceDestroy();
+ delCount++;
+ }
- // wait for the deleted object to be published, verify the count
+ ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
+ agent->exportDeletedObjects( delObjs );
+ BOOST_CHECK_EQUAL(delObjs.size(), delCount);
- uint32_t countDels = 0;
- while (r1.fetch(m1, Duration::SECOND * 6)) {
- TestObjectVector objs;
- decodeV1ObjectUpdates(m1, objs, objLen);
- BOOST_CHECK(objs.size() > 0);
+ // destroy the broker, and reinistantiate a new one without populating it
+ // with TestObjects.
-
- for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
+ r1.close();
+ delete fix;
+ while (tmv.size()) {
+ delete tmv.back();
+ tmv.pop_back();
+ }
- TestManageable::validateTestObjectProperties(**oIter);
+ fix = new AgentFixture(3);
+ r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
+ agent = fix->getBrokerAgent();
+ agent->importDeletedObjects( delObjs );
- qpid::types::Variant::Map mappy;
- (*oIter)->writeTimestamps(mappy);
- if (mappy["_delete_ts"].asUint64() != 0)
- countDels++;
- }
- }
+ // wait for the deleted object to be published, verify the count
- // make sure we get the correct # of deleted objects
- BOOST_CHECK_EQUAL(countDels, delCount);
+ uint32_t countDels = 0;
+ while (r1.fetch(m1, Duration::SECOND * 6)) {
+ TestObjectVector objs;
+ decodeV1ObjectUpdates(m1, objs, objLen);
+ BOOST_CHECK(objs.size() > 0);
- // verify there are no deleted objects to export now.
+
+ for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
- agent->exportDeletedObjects( delObjs );
- BOOST_CHECK(delObjs.size() == 0);
+ TestManageable::validateTestObjectProperties(**oIter);
- r1.close();
- delete fix;
+ qpid::types::Variant::Map mappy;
+ (*oIter)->writeTimestamps(mappy);
+ if (mappy["_delete_ts"].asUint64() != 0)
+ countDels++;
}
+ }
- // Verify that we can export and import multiple deleted objects correctly.
- // QMF V2 variant
- QPID_AUTO_TEST_CASE(v2ImportMultiDelObj)
- {
- AgentFixture* fix = new AgentFixture(3, true);
- management::ManagementAgent* agent;
- agent = fix->getBrokerAgent();
-
- Receiver r1 = fix->createV2DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
-
- // populate the agent with multiple test objects
- const size_t objCount = 50;
- std::vector<TestManageable *> tmv;
-
- for (size_t i = 0; i < objCount; i++) {
- std::stringstream key;
- key << "testobj-" << i;
- TestManageable *tm = new TestManageable(agent, key.str());
- if (tm->GetManagementObject()->writePropertiesSize()) {}
- agent->addObject(tm->GetManagementObject(), key.str());
- tmv.push_back(tm);
- }
+ // make sure we get the correct # of deleted objects
+ BOOST_CHECK_EQUAL(countDels, delCount);
- // wait for the objects to be published
- Message m1;
- uint32_t msgCount = 0;
- while(r1.fetch(m1, Duration::SECOND * 6)) {
- TestObjectVector objs;
- decodeV2ObjectUpdates(m1, objs);
- msgCount += objs.size();
- }
+ // verify there are no deleted objects to export now.
- BOOST_CHECK_EQUAL(msgCount, objCount);
+ agent->exportDeletedObjects( delObjs );
+ BOOST_CHECK(delObjs.size() == 0);
- // destroy some of the objects, then immediately export (before the next poll cycle)
+ r1.close();
+ delete fix;
+}
- uint32_t delCount = 0;
- for (size_t i = 0; i < objCount; i += 2) {
- tmv[i]->GetManagementObject()->resourceDestroy();
- delCount++;
- }
+// Verify that we can export and import multiple deleted objects correctly.
+// QMF V2 variant
+QPID_AUTO_TEST_CASE(v2ImportMultiDelObj)
+{
+ AgentFixture* fix = new AgentFixture(3, true);
+ management::ManagementAgent* agent;
+ agent = fix->getBrokerAgent();
+
+ Receiver r1 = fix->createV2DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
+
+ // populate the agent with multiple test objects
+ const size_t objCount = 50;
+ std::vector<TestManageable *> tmv;
+
+ for (size_t i = 0; i < objCount; i++) {
+ std::stringstream key;
+ key << "testobj-" << i;
+ TestManageable *tm = new TestManageable(agent, key.str());
+ if (tm->GetManagementObject()->writePropertiesSize()) {}
+ agent->addObject(tm->GetManagementObject(), key.str());
+ tmv.push_back(tm);
+ }
- ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
- agent->exportDeletedObjects( delObjs );
- BOOST_CHECK_EQUAL(delObjs.size(), delCount);
+ // wait for the objects to be published
+ Message m1;
+ uint32_t msgCount = 0;
+ while(r1.fetch(m1, Duration::SECOND * 6)) {
+ TestObjectVector objs;
+ decodeV2ObjectUpdates(m1, objs);
+ msgCount += objs.size();
+ }
- // destroy the broker, and reinistantiate a new one without populating it
- // with TestObjects.
+ BOOST_CHECK_EQUAL(msgCount, objCount);
- r1.close();
- delete fix;
- while (tmv.size()) {
- delete tmv.back();
- tmv.pop_back();
- }
+ // destroy some of the objects, then immediately export (before the next poll cycle)
- fix = new AgentFixture(3, true);
- r1 = fix->createV2DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
- agent = fix->getBrokerAgent();
- agent->importDeletedObjects( delObjs );
+ uint32_t delCount = 0;
+ for (size_t i = 0; i < objCount; i += 2) {
+ tmv[i]->GetManagementObject()->resourceDestroy();
+ delCount++;
+ }
- // wait for the deleted object to be published, verify the count
+ ::qpid::management::ManagementAgent::DeletedObjectList delObjs;
+ agent->exportDeletedObjects( delObjs );
+ BOOST_CHECK_EQUAL(delObjs.size(), delCount);
- uint32_t countDels = 0;
- while (r1.fetch(m1, Duration::SECOND * 6)) {
- TestObjectVector objs;
- decodeV2ObjectUpdates(m1, objs);
- BOOST_CHECK(objs.size() > 0);
+ // destroy the broker, and reinistantiate a new one without populating it
+ // with TestObjects.
- for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
+ r1.close();
+ delete fix;
+ while (tmv.size()) {
+ delete tmv.back();
+ tmv.pop_back();
+ }
- TestManageable::validateTestObjectProperties(**oIter);
+ fix = new AgentFixture(3, true);
+ r1 = fix->createV2DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#");
+ agent = fix->getBrokerAgent();
+ agent->importDeletedObjects( delObjs );
- qpid::types::Variant::Map mappy;
- (*oIter)->writeTimestamps(mappy);
- if (mappy["_delete_ts"].asUint64() != 0)
- countDels++;
- }
- }
+ // wait for the deleted object to be published, verify the count
- // make sure we get the correct # of deleted objects
- BOOST_CHECK_EQUAL(countDels, delCount);
+ uint32_t countDels = 0;
+ while (r1.fetch(m1, Duration::SECOND * 6)) {
+ TestObjectVector objs;
+ decodeV2ObjectUpdates(m1, objs);
+ BOOST_CHECK(objs.size() > 0);
- // verify there are no deleted objects to export now.
+ for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
- agent->exportDeletedObjects( delObjs );
- BOOST_CHECK(delObjs.size() == 0);
+ TestManageable::validateTestObjectProperties(**oIter);
- r1.close();
- delete fix;
+ qpid::types::Variant::Map mappy;
+ (*oIter)->writeTimestamps(mappy);
+ if (mappy["_delete_ts"].asUint64() != 0)
+ countDels++;
}
+ }
- // See QPID-2997
- QPID_AUTO_TEST_CASE(v2RapidRestoreObj)
- {
- AgentFixture* fix = new AgentFixture(3, true);
- management::ManagementAgent* agent;
- agent = fix->getBrokerAgent();
-
- // two objects, same ObjID
- TestManageable *tm1 = new TestManageable(agent, std::string("obj2"));
- TestManageable *tm2 = new TestManageable(agent, std::string("obj2"));
-
- Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObject()->getPackageName(), "#");
-
- // add, then immediately delete and re-add a copy of the object
- agent->addObject(tm1->GetManagementObject(), "testobj-1");
- tm1->GetManagementObject()->resourceDestroy();
- agent->addObject(tm2->GetManagementObject(), "testobj-1");
-
- // expect: a delete notification, then an update notification
- TestObjectVector objs;
- bool isDeleted = false;
- bool isAdvertised = false;
- size_t count = 0;
- Message m1;
- while (r1.fetch(m1, Duration::SECOND * 6)) {
-
- decodeV2ObjectUpdates(m1, objs);
- BOOST_CHECK(objs.size() > 0);
-
- for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
- count++;
- TestManageable::validateTestObjectProperties(**oIter);
-
- qpid::types::Variant::Map mappy;
- (*oIter)->writeTimestamps(mappy);
- if (mappy["_delete_ts"].asUint64() != 0) {
- isDeleted = true;
- BOOST_CHECK(isAdvertised == false); // delete must be first
- } else {
- isAdvertised = true;
- BOOST_CHECK(isDeleted == true); // delete must be first
- }
- }
- }
+ // make sure we get the correct # of deleted objects
+ BOOST_CHECK_EQUAL(countDels, delCount);
- BOOST_CHECK(isDeleted);
- BOOST_CHECK(isAdvertised);
- BOOST_CHECK(count == 2);
+ // verify there are no deleted objects to export now.
- r1.close();
- delete fix;
- delete tm1;
- delete tm2;
- }
+ agent->exportDeletedObjects( delObjs );
+ BOOST_CHECK(delObjs.size() == 0);
- // See QPID-2997
- QPID_AUTO_TEST_CASE(v2DuplicateErrorObj)
- {
- AgentFixture* fix = new AgentFixture(3, true);
- management::ManagementAgent* agent;
- agent = fix->getBrokerAgent();
-
- // turn off the expected error log message
- qpid::log::Options logOpts;
- logOpts.selectors.clear();
- logOpts.selectors.push_back("critical+");
- qpid::log::Logger::instance().configure(logOpts);
-
- // two objects, same ObjID
- TestManageable *tm1 = new TestManageable(agent, std::string("obj2"));
- TestManageable *tm2 = new TestManageable(agent, std::string("obj2"));
- // Keep a pointer to the ManagementObject. This test simulates a user-caused error
- // case (duplicate objects) where the broker has no choice but to leak a management
- // object (safest assumption). To prevent valgrind from flagging this leak, we
- // manually clean up the object at the end of the test.
- management::ManagementObject *save = tm2->GetManagementObject();
-
- Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObject()->getPackageName(), "#");
-
- // add, then immediately delete and re-add a copy of the object
- agent->addObject(tm1->GetManagementObject(), "testobj-1");
- agent->addObject(tm2->GetManagementObject(), "testobj-1");
-
- TestObjectVector objs;
- size_t count = 0;
- Message m1;
- while (r1.fetch(m1, Duration::SECOND * 6)) {
-
- decodeV2ObjectUpdates(m1, objs);
- BOOST_CHECK(objs.size() > 0);
-
- for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
- count++;
- TestManageable::validateTestObjectProperties(**oIter);
- }
+ r1.close();
+ delete fix;
+}
+
+// See QPID-2997
+QPID_AUTO_TEST_CASE(v2RapidRestoreObj)
+{
+ AgentFixture* fix = new AgentFixture(3, true);
+ management::ManagementAgent* agent;
+ agent = fix->getBrokerAgent();
+
+ // two objects, same ObjID
+ TestManageable *tm1 = new TestManageable(agent, std::string("obj2"));
+ TestManageable *tm2 = new TestManageable(agent, std::string("obj2"));
+
+ Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObject()->getPackageName(), "#");
+
+ // add, then immediately delete and re-add a copy of the object
+ agent->addObject(tm1->GetManagementObject(), "testobj-1");
+ tm1->GetManagementObject()->resourceDestroy();
+ agent->addObject(tm2->GetManagementObject(), "testobj-1");
+
+ // expect: a delete notification, then an update notification
+ TestObjectVector objs;
+ bool isDeleted = false;
+ bool isAdvertised = false;
+ size_t count = 0;
+ Message m1;
+ while (r1.fetch(m1, Duration::SECOND * 6)) {
+
+ decodeV2ObjectUpdates(m1, objs);
+ BOOST_CHECK(objs.size() > 0);
+
+ for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) {
+ count++;
+ TestManageable::validateTestObjectProperties(**oIter);
+
+ qpid::types::Variant::Map mappy;
+ (*oIter)->writeTimestamps(mappy);
+ if (mappy["_delete_ts"].asUint64() != 0) {
+ isDeleted = true;
+ BOOST_CHECK(isAdvertised == false); // delete must be first
+ } else {
+ isAdvertised = true;
+ BOOST_CHECK(isDeleted == true); // delete must be first
}
+ }
+ }
- BOOST_CHECK(count == 1); // only one should be accepted.
+ BOOST_CHECK(isDeleted);
+ BOOST_CHECK(isAdvertised);
+ BOOST_CHECK(count == 2);
- r1.close();
- delete fix;
- delete tm1;
- delete tm2;
- delete save;
- }
+ r1.close();
+ delete fix;
+ delete tm1;
+ delete tm2;
+}
- QPID_AUTO_TEST_SUITE_END()
- }
+QPID_AUTO_TEST_SUITE_END()
+}
}
diff --git a/qpid/cpp/src/tests/testagent.cpp b/qpid/cpp/src/tests/testagent.cpp
index e6010a8e00..d538a8181c 100644
--- a/qpid/cpp/src/tests/testagent.cpp
+++ b/qpid/cpp/src/tests/testagent.cpp
@@ -59,7 +59,7 @@ class CoreClass : public Manageable
{
string name;
ManagementAgent* agent;
- _qmf::Parent* mgmtObject;
+ _qmf::Parent::shared_ptr mgmtObject;
std::vector<ChildClass*> children;
Mutex vectorLock;
@@ -68,7 +68,7 @@ public:
CoreClass(ManagementAgent* agent, string _name);
~CoreClass() { mgmtObject->resourceDestroy(); }
- ManagementObject* GetManagementObject(void) const
+ ManagementObject::shared_ptr GetManagementObject(void) const
{ return mgmtObject; }
void doLoop();
@@ -78,14 +78,14 @@ public:
class ChildClass : public Manageable
{
string name;
- _qmf::Child* mgmtObject;
+ _qmf::Child::shared_ptr mgmtObject;
public:
ChildClass(ManagementAgent* agent, CoreClass* parent, string name);
~ChildClass() { mgmtObject->resourceDestroy(); }
- ManagementObject* GetManagementObject(void) const
+ ManagementObject::shared_ptr GetManagementObject(void) const
{ return mgmtObject; }
void doWork()
@@ -97,9 +97,9 @@ public:
CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent(_agent)
{
static uint64_t persistId = 0x111222333444555LL;
- mgmtObject = new _qmf::Parent(agent, this, name);
+ mgmtObject = _qmf::Parent::shared_ptr(new _qmf::Parent(agent, this, name));
- agent->addObject(mgmtObject, persistId++);
+ agent->addObject(mgmtObject.get(), persistId++);
mgmtObject->set_state("IDLE");
}
@@ -146,9 +146,9 @@ Manageable::status_t CoreClass::ManagementMethod(uint32_t methodId, Args& args,
ChildClass::ChildClass(ManagementAgent* agent, CoreClass* parent, string name)
{
- mgmtObject = new _qmf::Child(agent, this, parent, name);
+ mgmtObject = _qmf::Child::shared_ptr(new _qmf::Child(agent, this, parent, name));
- agent->addObject(mgmtObject);
+ agent->addObject(mgmtObject.get());
}