diff options
| author | Alan Conway <aconway@apache.org> | 2012-10-15 21:35:38 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-10-15 21:35:38 +0000 |
| commit | 0d533491f135278fda9fa8c10d45e11da0c04d03 (patch) | |
| tree | c097eec9b651dcf12e84fad3eca1b8d433c9ad14 /qpid/cpp/src | |
| parent | 38f2ef8faa174042d11eae77c50a606d0063ed8c (diff) | |
| download | qpid-python-0d533491f135278fda9fa8c10d45e11da0c04d03.tar.gz | |
MQPID-4286: QMF queries for HA replication take too long to process (Jason Dillaman)
Rework ManagementAgent locks, get rid of shared buffers that were points of contention.
Minor log message improvements in ha code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1398530 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
35 files changed, 1118 insertions, 1220 deletions
diff --git a/qpid/cpp/src/posix/QpiddBroker.cpp b/qpid/cpp/src/posix/QpiddBroker.cpp index f1f9009568..a681a6d18d 100644 --- a/qpid/cpp/src/posix/QpiddBroker.cpp +++ b/qpid/cpp/src/posix/QpiddBroker.cpp @@ -144,7 +144,7 @@ struct QpiddDaemon : public Daemon { uint16_t port=brokerPtr->getPort(options->daemon.transport); ready(port); // Notify parent. if (options->parent->broker.enableMgmt && (options->parent->broker.port == 0 || options->daemon.transport != TCP)) { - dynamic_cast<qmf::org::apache::qpid::broker::Broker*>(brokerPtr->GetManagementObject())->set_port(port); + boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObject())->set_port(port); } brokerPtr->run(); } @@ -200,7 +200,7 @@ int QpiddBroker::execute (QpiddOptions *options) { uint16_t port = brokerPtr->getPort(myOptions->daemon.transport); cout << port << endl; if (options->broker.enableMgmt) { - dynamic_cast<qmf::org::apache::qpid::broker::Broker*>(brokerPtr->GetManagementObject())->set_port(port); + boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(brokerPtr->GetManagementObject())->set_port(port); } } brokerPtr->run(); diff --git a/qpid/cpp/src/qpid/acl/Acl.cpp b/qpid/cpp/src/qpid/acl/Acl.cpp index cd79add720..61e0b56104 100644 --- a/qpid/cpp/src/qpid/acl/Acl.cpp +++ b/qpid/cpp/src/qpid/acl/Acl.cpp @@ -52,7 +52,7 @@ using qpid::management::Manageable; using qpid::management::Args; namespace _qmf = qmf::org::apache::qpid::acl; -Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(false), mgmtObject(0), +Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(false), connectionCounter(new ConnectionCounter(*this, aclValues.aclMaxConnectPerUser, aclValues.aclMaxConnectPerIp, aclValues.aclMaxConnectTotal)), resourceCounter(new ResourceCounter(*this, aclValues.aclMaxQueuesPerUser)){ @@ -60,7 +60,7 @@ Acl::Acl (AclValues& av, Broker& b): aclValues(av), broker(&b), transferAcl(fals if (agent != 0){ _qmf::Package packageInit(agent); - mgmtObject = new _qmf::Acl (agent, this, broker); + mgmtObject = _qmf::Acl::shared_ptr(new _qmf::Acl (agent, this, broker)); agent->addObject (mgmtObject); mgmtObject->set_maxConnections(aclValues.aclMaxConnectTotal); mgmtObject->set_maxConnectionsPerIp(aclValues.aclMaxConnectPerIp); @@ -317,9 +317,9 @@ Acl::~Acl(){ broker->getConnectionObservers().remove(connectionCounter); } -ManagementObject* Acl::GetManagementObject(void) const +ManagementObject::shared_ptr Acl::GetManagementObject(void) const { - return (ManagementObject*) mgmtObject; + return mgmtObject; } Manageable::status_t Acl::ManagementMethod (uint32_t methodId, Args& args, string& text) diff --git a/qpid/cpp/src/qpid/acl/Acl.h b/qpid/cpp/src/qpid/acl/Acl.h index a23952ff53..28cbfb8f3f 100644 --- a/qpid/cpp/src/qpid/acl/Acl.h +++ b/qpid/cpp/src/qpid/acl/Acl.h @@ -62,7 +62,7 @@ private: broker::Broker* broker; bool transferAcl; boost::shared_ptr<AclData> data; - qmf::org::apache::qpid::acl::Acl* mgmtObject; // mgnt owns lifecycle + qmf::org::apache::qpid::acl::Acl::shared_ptr mgmtObject; qpid::management::ManagementAgent* agent; mutable qpid::sys::Mutex dataLock; boost::shared_ptr<ConnectionCounter> connectionCounter; @@ -113,7 +113,7 @@ private: bool readAclFile(std::string& aclFile, std::string& errorText); Manageable::status_t lookup (management::Args& args, std::string& text); Manageable::status_t lookupPublish(management::Args& args, std::string& text); - virtual qpid::management::ManagementObject* GetManagementObject(void) const; + virtual qpid::management::ManagementObject::shared_ptr GetManagementObject(void) const; virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); }; diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index d1706b5907..dfc99bb834 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -60,7 +60,7 @@ void Bridge::PushHandler::handle(framing::AMQFrame& frame) Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, CancellationListener l, const _qmf::ArgsLinkBridge& _args, InitializeCallback init, const std::string& _queueName, const string& ae) : - link(_link), channel(_id), args(_args), mgmtObject(0), + link(_link), channel(_id), args(_args), listener(l), name(_name), queueName(_queueName.empty() ? "qpid.bridge_queue_" + name + "_" + link->getBroker()->getFederationTag() : _queueName), @@ -71,10 +71,10 @@ Bridge::Bridge(const std::string& _name, Link* _link, framing::ChannelId _id, { ManagementAgent* agent = link->getBroker()->getManagementAgent(); if (agent != 0) { - mgmtObject = new _qmf::Bridge + mgmtObject = _qmf::Bridge::shared_ptr(new _qmf::Bridge (agent, this, link, name, args.i_durable, args.i_src, args.i_dest, args.i_key, args.i_srcIsQueue, args.i_srcIsLocal, - args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync); + args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync)); mgmtObject->set_channelId(channel); agent->addObject(mgmtObject); } @@ -296,9 +296,9 @@ uint32_t Bridge::encodedSize() const + 2; // sync } -management::ManagementObject* Bridge::GetManagementObject (void) const +management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) const { - return (management::ManagementObject*) mgmtObject; + return mgmtObject; } management::Manageable::status_t Bridge::ManagementMethod(uint32_t methodId, diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index ee298afd45..2b4d019ccd 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -72,7 +72,7 @@ class Bridge : public PersistableConfig, bool isDetached() const { return detached; } - management::ManagementObject* GetManagementObject() const; + management::ManagementObject::shared_ptr GetManagementObject() const; management::Manageable::status_t ManagementMethod(uint32_t methodId, management::Args& args, std::string& text); @@ -128,7 +128,7 @@ class Bridge : public PersistableConfig, Link* const link; const framing::ChannelId channel; qmf::org::apache::qpid::broker::ArgsLinkBridge args; - qmf::org::apache::qpid::broker::Bridge* mgmtObject; + qmf::org::apache::qpid::broker::Bridge::shared_ptr mgmtObject; CancellationListener listener; std::string name; std::string queueName; diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 32b0fdd46e..0c466aea07 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -214,7 +214,6 @@ Broker::Broker(const Broker::Options& conf) : conf.replayFlushLimit*1024, // convert kb to bytes. conf.replayHardLimit*1024), *this), - mgmtObject(0), queueCleaner(queues, &timer), recoveryInProgress(false), recovery(true), @@ -235,7 +234,7 @@ Broker::Broker(const Broker::Options& conf) : System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this); systemObject = System::shared_ptr(system); - mgmtObject = new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker"); + mgmtObject = _qmf::Broker::shared_ptr(new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker")); mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId()); mgmtObject->set_port(conf.port); mgmtObject->set_workerThreads(conf.workerThreads); @@ -450,9 +449,9 @@ Broker::~Broker() { QPID_LOG(notice, "Shut down"); } -ManagementObject* Broker::GetManagementObject(void) const +ManagementObject::shared_ptr Broker::GetManagementObject(void) const { - return (ManagementObject*) mgmtObject; + return mgmtObject; } Manageable* Broker::GetVhostObject(void) const diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 9b90330cb9..11757657b8 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -171,7 +171,7 @@ class Broker : public sys::Runnable, public Plugin::Target, boost::shared_ptr<sys::ConnectionCodec::Factory> factory; DtxManager dtxManager; SessionManager sessionManager; - qmf::org::apache::qpid::broker::Broker* mgmtObject; + qmf::org::apache::qpid::broker::Broker::shared_ptr mgmtObject; Vhost::shared_ptr vhostObject; System::shared_ptr systemObject; QueueCleaner queueCleaner; @@ -230,7 +230,7 @@ class Broker : public sys::Runnable, public Plugin::Target, SessionManager& getSessionManager() { return sessionManager; } const std::string& getFederationTag() const { return federationTag; } - QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject() const; + QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject() const; QPID_BROKER_EXTERN management::Manageable* GetVhostObject() const; QPID_BROKER_EXTERN management::Manageable::status_t ManagementMethod( uint32_t methodId, management::Args& args, std::string& text); diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 08a9c756d0..043325a4fa 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -97,7 +97,6 @@ Connection::Connection(ConnectionOutputHandler* out_, link(link_), mgmtClosing(false), mgmtId(mgmtId_), - mgmtObject(0), links(broker_.getLinks()), agent(0), timer(broker_.getTimer()), @@ -119,7 +118,7 @@ void Connection::addManagementObject() { agent = broker.getManagementAgent(); if (agent != 0) { // TODO set last bool true if system connection - mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !link, false); + mgmtObject = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, mgmtId, !link, false)); mgmtObject->set_shadow(shadow); agent->addObject(mgmtObject, objectId); } @@ -403,9 +402,9 @@ SessionHandler& Connection::getChannel(ChannelId id) { return *ptr_map_ptr(i); } -ManagementObject* Connection::GetManagementObject(void) const +ManagementObject::shared_ptr Connection::GetManagementObject(void) const { - return (ManagementObject*) mgmtObject; + return mgmtObject; } Manageable::status_t Connection::ManagementMethod(uint32_t methodId, Args&, string&) diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index d01599ce54..3ef9877750 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -112,7 +112,7 @@ class Connection : public sys::ConnectionInputHandler, void closeChannel(framing::ChannelId channel); // Manageable entry points - management::ManagementObject* GetManagementObject (void) const; + management::ManagementObject::shared_ptr GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string&); @@ -196,7 +196,7 @@ class Connection : public sys::ConnectionInputHandler, const std::string mgmtId; sys::Mutex ioCallbackLock; std::queue<boost::function0<void> > ioCallbacks; - qmf::org::apache::qpid::broker::Connection* mgmtObject; + qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject; LinkRegistry& links; management::ManagementAgent* agent; sys::Timer& timer; @@ -231,7 +231,7 @@ class Connection : public sys::ConnectionInputHandler, public: - qmf::org::apache::qpid::broker::Connection* getMgmtObject() { return mgmtObject; } + qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; } }; }} diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index a484cc054e..d1dd1fae6c 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -150,7 +150,7 @@ void ConnectionHandler::Handler::startOk(const framing::FieldTable& /*clientProp void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body) { const framing::FieldTable& clientProperties = body.getClientProperties(); - qmf::org::apache::qpid::broker::Connection* mgmtObject = connection.getMgmtObject(); + qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject = connection.getMgmtObject(); if (mgmtObject != 0) { string procName = clientProperties.getAsString(CLIENT_PROCESS_NAME); diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index bb5dc2b807..12360df81d 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -165,19 +165,19 @@ void Exchange::routeIVE(){ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : name(_name), durable(false), alternateUsers(0), persistenceId(0), sequence(false), - sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false) + sequenceNo(0), ive(false), broker(b), destroyed(false) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { - mgmtExchange = new _qmf::Exchange (agent, this, parent, _name); + mgmtExchange = _qmf::Exchange::shared_ptr(new _qmf::Exchange (agent, this, parent, _name)); mgmtExchange->set_durable(durable); mgmtExchange->set_autoDelete(false); agent->addObject(mgmtExchange, 0, durable); if (broker) - brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); + brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject()); } } } @@ -185,20 +185,20 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent, Broker* b) : name(_name), durable(_durable), alternateUsers(0), persistenceId(0), - args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false) + args(_args), sequence(false), sequenceNo(0), ive(false), broker(b), destroyed(false) { if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { - mgmtExchange = new _qmf::Exchange (agent, this, parent, _name); + mgmtExchange = _qmf::Exchange::shared_ptr(new _qmf::Exchange (agent, this, parent, _name)); mgmtExchange->set_durable(durable); mgmtExchange->set_autoDelete(false); mgmtExchange->set_arguments(ManagementAgent::toMap(args)); agent->addObject(mgmtExchange, 0, durable); if (broker) - brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); + brokerMgmtObject = boost::dynamic_pointer_cast<qmf::org::apache::qpid::broker::Broker>(broker->GetManagementObject()); } } @@ -294,9 +294,9 @@ void Exchange::recoveryComplete(ExchangeRegistry& exchanges) } } -ManagementObject* Exchange::GetManagementObject (void) const +ManagementObject::shared_ptr Exchange::GetManagementObject (void) const { - return (ManagementObject*) mgmtExchange; + return mgmtExchange; } void Exchange::registerDynamicBridge(DynamicBridge* db) @@ -345,16 +345,16 @@ void Exchange::propagateFedOp(const string& routingKey, const string& tags, cons Exchange::Binding::Binding(const string& _key, Queue::shared_ptr _queue, Exchange* _parent, FieldTable _args, const string& _origin) - : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin), mgmtBinding(0) + : parent(_parent), queue(_queue), key(_key), args(_args), origin(_origin) { } Exchange::Binding::~Binding () { if (mgmtBinding != 0) { - ManagementObject* mo = queue->GetManagementObject(); + _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject()); if (mo != 0) - static_cast<_qmf::Queue*>(mo)->dec_bindingCount(); + mo->dec_bindingCount(); mgmtBinding->resourceDestroy (); } } @@ -367,25 +367,25 @@ void Exchange::Binding::startManagement() if (broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); if (agent != 0) { - ManagementObject* mo = queue->GetManagementObject(); + _qmf::Queue::shared_ptr mo = boost::dynamic_pointer_cast<_qmf::Queue>(queue->GetManagementObject()); if (mo != 0) { management::ObjectId queueId = mo->getObjectId(); - mgmtBinding = new _qmf::Binding - (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args)); + mgmtBinding = _qmf::Binding::shared_ptr(new _qmf::Binding + (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args))); if (!origin.empty()) mgmtBinding->set_origin(origin); agent->addObject(mgmtBinding); - static_cast<_qmf::Queue*>(mo)->inc_bindingCount(); + mo->inc_bindingCount(); } } } } } -ManagementObject* Exchange::Binding::GetManagementObject () const +ManagementObject::shared_ptr Exchange::Binding::GetManagementObject () const { - return (ManagementObject*) mgmtBinding; + return mgmtBinding; } Exchange::MatchQueue::MatchQueue(Queue::shared_ptr q) : queue(q) {} diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h index 26ee5b0054..517b551a83 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.h +++ b/qpid/cpp/src/qpid/broker/Exchange.h @@ -52,13 +52,13 @@ public: const std::string key; const framing::FieldTable args; std::string origin; - qmf::org::apache::qpid::broker::Binding* mgmtBinding; + qmf::org::apache::qpid::broker::Binding::shared_ptr mgmtBinding; Binding(const std::string& key, boost::shared_ptr<Queue> queue, Exchange* parent = 0, framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string()); ~Binding(); void startManagement(); - management::ManagementObject* GetManagementObject() const; + management::ManagementObject::shared_ptr GetManagementObject() const; }; private: @@ -159,8 +159,8 @@ protected: } }; - qmf::org::apache::qpid::broker::Exchange* mgmtExchange; - qmf::org::apache::qpid::broker::Broker* brokerMgmtObject; + qmf::org::apache::qpid::broker::Exchange::shared_ptr mgmtExchange; + qmf::org::apache::qpid::broker::Broker::shared_ptr brokerMgmtObject; public: typedef boost::shared_ptr<Exchange> shared_ptr; @@ -210,7 +210,7 @@ public: static QPID_BROKER_EXTERN Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer); // Manageable entry points - QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject(void) const; + QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject(void) const; // Federation hooks class DynamicBridge { diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index afa5623ecd..b014217180 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -143,7 +143,7 @@ Link::Link(const string& _name, host(_host), port(_port), transport(_transport), durable(_durable), authMechanism(_authMechanism), username(_username), password(_password), - persistenceId(0), mgmtObject(0), broker(_broker), state(0), + persistenceId(0), broker(_broker), state(0), visitCount(0), currentInterval(1), closing(false), @@ -161,7 +161,7 @@ Link::Link(const string& _name, agent = broker->getManagementAgent(); if (agent != 0) { - mgmtObject = new _qmf::Link(agent, this, parent, name, durable); + mgmtObject = _qmf::Link::shared_ptr(new _qmf::Link(agent, this, parent, name, durable)); mgmtObject->set_host(host); mgmtObject->set_port(port); mgmtObject->set_transport(transport); @@ -638,9 +638,9 @@ uint32_t Link::encodedSize() const + password.size() + 1; } -ManagementObject* Link::GetManagementObject (void) const +ManagementObject::shared_ptr Link::GetManagementObject (void) const { - return (ManagementObject*) mgmtObject; + return mgmtObject; } void Link::close() { diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index f0cb90e73b..49ee3ef36a 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -69,7 +69,7 @@ class Link : public PersistableConfig, public management::Manageable { std::string username; std::string password; mutable uint64_t persistenceId; - qmf::org::apache::qpid::broker::Link* mgmtObject; + qmf::org::apache::qpid::broker::Link::shared_ptr mgmtObject; Broker* broker; int state; uint32_t visitCount; @@ -181,7 +181,7 @@ class Link : public PersistableConfig, public management::Manageable { static bool isEncodedLink(const std::string& key); // Manageable entry points - management::ManagementObject* GetManagementObject(void) const; + management::ManagementObject::shared_ptr GetManagementObject(void) const; management::Manageable::status_t ManagementMethod(uint32_t, management::Args&, std::string&); // manage the exchange owned by this link diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 9cf2f541ce..9dc9ec7a6d 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -77,8 +77,8 @@ namespace { inline void mgntEnqStats(const Message& msg, - _qmf::Queue* mgmtObject, - _qmf::Broker* brokerMgmtObject) + _qmf::Queue::shared_ptr mgmtObject, + _qmf::Broker::shared_ptr brokerMgmtObject) { if (mgmtObject != 0) { _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); @@ -101,8 +101,8 @@ inline void mgntEnqStats(const Message& msg, } inline void mgntDeqStats(const Message& msg, - _qmf::Queue* mgmtObject, - _qmf::Broker* brokerMgmtObject) + _qmf::Queue::shared_ptr mgmtObject, + _qmf::Broker::shared_ptr brokerMgmtObject) { if (mgmtObject != 0){ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); @@ -179,8 +179,6 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, messages(new MessageDeque()), persistenceId(0), settings(b ? merge(_settings, b->getOptions()) : _settings), - mgmtObject(0), - brokerMgmtObject(0), eventMode(0), broker(b), deleted(false), @@ -195,17 +193,17 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, qpid::amqp_0_10::translate(settings.asMap(), encodableSettings); if (parent != 0 && broker != 0) { ManagementAgent* agent = broker->getManagementAgent(); - if (agent != 0) { - mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete); + mgmtObject = _qmf::Queue::shared_ptr( + new _qmf::Queue(agent, this, parent, _name, _store != 0, settings.autodelete)); mgmtObject->set_arguments(settings.asMap()); agent->addObject(mgmtObject, 0, store != 0); - brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); + brokerMgmtObject = boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject()); if (brokerMgmtObject) brokerMgmtObject->inc_queueCount(); } } - + if ( settings.isBrowseOnly ) { QPID_LOG ( info, "Queue " << name << " is browse-only." ); } @@ -213,11 +211,6 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, Queue::~Queue() { - if (mgmtObject != 0) { - mgmtObject->resourceDestroy(); - if (brokerMgmtObject) - brokerMgmtObject->dec_queueCount(); - } } bool isLocalTo(const OwnershipToken* token, const Message& msg) @@ -1076,6 +1069,12 @@ void Queue::destroyed() boost::bind(&QueueObserver::destroy, _1)); observers.clear(); } + + if (mgmtObject != 0) { + mgmtObject->resourceDestroy(); + if (brokerMgmtObject) + brokerMgmtObject->dec_queueCount(); + } } void Queue::notifyDeleted() @@ -1109,7 +1108,7 @@ void Queue::setPersistenceId(uint64_t _persistenceId) const { if (mgmtObject != 0 && persistenceId == 0 && externalQueueStore) { - ManagementObject* childObj = externalQueueStore->GetManagementObject(); + ManagementObject::shared_ptr childObj = externalQueueStore->GetManagementObject(); if (childObj != 0) childObj->setReference(mgmtObject->getObjectId()); } @@ -1263,7 +1262,7 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { externalQueueStore = inst; if (inst) { - ManagementObject* childObj = inst->GetManagementObject(); + ManagementObject::shared_ptr childObj = inst->GetManagementObject(); if (childObj != 0 && mgmtObject != 0) childObj->setReference(mgmtObject->getObjectId()); } @@ -1311,9 +1310,9 @@ void Queue::countLoadedFromDisk(uint64_t size) const } -ManagementObject* Queue::GetManagementObject (void) const +ManagementObject::shared_ptr Queue::GetManagementObject (void) const { - return (ManagementObject*) mgmtObject; + return mgmtObject; } Manageable::status_t Queue::ManagementMethod (uint32_t methodId, Args& args, string& etext) diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index f6ea6e0adb..32e9201b5b 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -151,8 +151,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, std::string alternateExchangeName; boost::shared_ptr<Exchange> alternateExchange; framing::SequenceNumber sequence; - qmf::org::apache::qpid::broker::Queue* mgmtObject; - qmf::org::apache::qpid::broker::Broker* brokerMgmtObject; + qmf::org::apache::qpid::broker::Queue::shared_ptr mgmtObject; + qmf::org::apache::qpid::broker::Broker::shared_ptr brokerMgmtObject; sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge. int eventMode; Observers observers; @@ -339,7 +339,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, QPID_BROKER_EXTERN void countLoadedFromDisk(uint64_t size) const; // Manageable entry points - QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject (void) const; + QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject (void) const; management::Manageable::status_t QPID_BROKER_EXTERN ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); QPID_BROKER_EXTERN void query(::qpid::types::Variant::Map&) const; diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index c52cdee6a4..944cc7e838 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -68,7 +68,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, : StatefulQueueObserver(std::string("QueueFlowLimit")), queue(_queue), queueName("<unknown>"), flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), - flowStopped(false), count(0), size(0), queueMgmtObj(0), broker(0) + flowStopped(false), count(0), size(0), broker(0) { uint32_t maxCount(0); uint64_t maxSize(0); @@ -78,7 +78,7 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount(); if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize(); broker = queue->getBroker(); - queueMgmtObj = dynamic_cast<_qmfBroker::Queue*> (queue->GetManagementObject()); + queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObject()); if (queueMgmtObj) { queueMgmtObj->set_flowStopped(isFlowControlActive()); } diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h index 1bcc388ceb..0e83457efa 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h @@ -31,14 +31,8 @@ #include "qpid/framing/SequenceNumber.h" #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Mutex.h" +#include "qmf/org/apache/qpid/broker/Queue.h" -namespace qmf { -namespace org { -namespace apache { -namespace qpid { -namespace broker { - class Queue; -}}}}} namespace _qmfBroker = qmf::org::apache::qpid::broker; namespace qpid { @@ -118,7 +112,7 @@ struct QueueSettings; std::map<framing::SequenceNumber, Message > index; mutable qpid::sys::Mutex indexLock; - _qmfBroker::Queue *queueMgmtObj; + _qmfBroker::Queue::shared_ptr queueMgmtObj; const Broker *broker; diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp index 2d7c820b63..bc7c96f08c 100644 --- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp +++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp @@ -424,7 +424,7 @@ void CyrusAuthenticator::start(const string& mechanism, const string* response) &challenge, &challenge_len); processAuthenticationStep(code, challenge, challenge_len); - qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject(); + qmf::org::apache::qpid::broker::Connection::shared_ptr cnxMgmt = connection.getMgmtObject(); if ( cnxMgmt ) cnxMgmt->set_saslMechanism(mechanism); } @@ -507,7 +507,7 @@ std::auto_ptr<SecurityLayer> CyrusAuthenticator::getSecurityLayer(uint16_t maxFr if (ssf) { securityLayer = std::auto_ptr<SecurityLayer>(new CyrusSecurityLayer(sasl_conn, maxFrameSize)); } - qmf::org::apache::qpid::broker::Connection* cnxMgmt = connection.getMgmtObject(); + qmf::org::apache::qpid::broker::Connection::shared_ptr cnxMgmt = connection.getMgmtObject(); if ( cnxMgmt ) cnxMgmt->set_saslSsf(ssf); return securityLayer; diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 65530394a3..f3d97dc078 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -300,8 +300,7 @@ Consumer(_name, type), arguments(_arguments), notifyEnabled(true), syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), - deliveryCount(0), - mgmtObject(0) + deliveryCount(0) { if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0) { @@ -310,17 +309,17 @@ Consumer(_name, type), if (agent != 0) { - mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(), - !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)); + mgmtObject = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(), + !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments))); agent->addObject (mgmtObject); mgmtObject->set_creditMode("WINDOW"); } } } -ManagementObject* SemanticState::ConsumerImpl::GetManagementObject (void) const +ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObject (void) const { - return (ManagementObject*) mgmtObject; + return mgmtObject; } Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&) diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 67cfe808d0..1d7ccbfa9c 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -96,7 +96,7 @@ class SemanticState : private boost::noncopyable { bool notifyEnabled; const int syncFrequency; int deliveryCount; - qmf::org::apache::qpid::broker::Subscription* mgmtObject; + qmf::org::apache::qpid::broker::Subscription::shared_ptr mgmtObject; bool checkCredit(const Message& msg); void allocateCredit(const Message& msg); @@ -160,7 +160,7 @@ class SemanticState : private boost::noncopyable { void acknowledged(const DeliveryRecord&) {} // manageable entry points - QPID_BROKER_EXTERN management::ManagementObject* + QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject(void) const; QPID_BROKER_EXTERN management::Manageable::status_t diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index fe357eb949..42be45f7ce 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -58,7 +58,6 @@ SessionState::SessionState( broker(b), handler(&h), semanticState(*this), adapter(semanticState), - mgmtObject(0), asyncCommandCompleter(new AsyncCommandCompleter(this)) { if (!delayManagement) addManagementObject(); @@ -71,8 +70,8 @@ void SessionState::addManagementObject() { if (parent != 0) { ManagementAgent* agent = getBroker().getManagementAgent(); if (agent != 0) { - mgmtObject = new _qmf::Session - (agent, this, parent, getId().getName()); + mgmtObject = _qmf::Session::shared_ptr(new _qmf::Session + (agent, this, parent, getId().getName())); mgmtObject->set_attached (0); mgmtObject->set_detachedLifespan (0); mgmtObject->clr_expireTime(); @@ -149,9 +148,9 @@ void SessionState::giveReadCredit(int32_t credit) { getConnection().outputTasks.giveReadCredit(credit); } -ManagementObject* SessionState::GetManagementObject (void) const +ManagementObject::shared_ptr SessionState::GetManagementObject (void) const { - return (ManagementObject*) mgmtObject; + return mgmtObject; } Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 5e3a77d7ed..af384ff761 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -110,7 +110,7 @@ class SessionState : public qpid::SessionState, const qpid::types::Variant::Map& annotations, bool sync); // Manageable entry points - management::ManagementObject* GetManagementObject (void) const; + management::ManagementObject::shared_ptr GetManagementObject (void) const; management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string&); @@ -168,7 +168,7 @@ class SessionState : public qpid::SessionState, SemanticState semanticState; SessionAdapter adapter; MessageBuilder msgBuilder; - qmf::org::apache::qpid::broker::Session* mgmtObject; + qmf::org::apache::qpid::broker::Session::shared_ptr mgmtObject; qpid::framing::SequenceSet accepted; // sequence numbers for pending received Execution.Sync commands diff --git a/qpid/cpp/src/qpid/broker/System.cpp b/qpid/cpp/src/qpid/broker/System.cpp index fa8df6406b..8d54427fdc 100644 --- a/qpid/cpp/src/qpid/broker/System.cpp +++ b/qpid/cpp/src/qpid/broker/System.cpp @@ -31,7 +31,7 @@ using namespace qpid::broker; using namespace std; namespace _qmf = qmf::org::apache::qpid::broker; -System::System (string _dataDir, Broker* broker) : mgmtObject(0) +System::System (string _dataDir, Broker* broker) { ManagementAgent* agent = broker ? broker->getManagementAgent() : 0; @@ -64,7 +64,7 @@ System::System (string _dataDir, Broker* broker) : mgmtObject(0) } } - mgmtObject = new _qmf::System(agent, this, types::Uuid(systemId.c_array())); + mgmtObject = _qmf::System::shared_ptr(new _qmf::System(agent, this, types::Uuid(systemId.c_array()))); qpid::sys::SystemInfo::getSystemId (osName, nodeName, release, diff --git a/qpid/cpp/src/qpid/broker/System.h b/qpid/cpp/src/qpid/broker/System.h index 6847c662ae..591d2a14a6 100644 --- a/qpid/cpp/src/qpid/broker/System.h +++ b/qpid/cpp/src/qpid/broker/System.h @@ -35,7 +35,7 @@ class System : public management::Manageable { private: - qmf::org::apache::qpid::broker::System* mgmtObject; + qmf::org::apache::qpid::broker::System::shared_ptr mgmtObject; framing::Uuid systemId; std::string osName, nodeName, release, version, machine; @@ -45,7 +45,7 @@ class System : public management::Manageable System (std::string _dataDir, Broker* broker = 0); - management::ManagementObject* GetManagementObject (void) const + management::ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } diff --git a/qpid/cpp/src/qpid/broker/Vhost.cpp b/qpid/cpp/src/qpid/broker/Vhost.cpp index a9ca3b42ab..e72118b570 100644 --- a/qpid/cpp/src/qpid/broker/Vhost.cpp +++ b/qpid/cpp/src/qpid/broker/Vhost.cpp @@ -29,7 +29,7 @@ namespace qpid { namespace management { class Manageable; }} -Vhost::Vhost (qpid::management::Manageable* parentBroker, Broker* broker) : mgmtObject(0) +Vhost::Vhost (qpid::management::Manageable* parentBroker, Broker* broker) { if (parentBroker != 0 && broker != 0) { @@ -37,7 +37,7 @@ Vhost::Vhost (qpid::management::Manageable* parentBroker, Broker* broker) : mgmt if (agent != 0) { - mgmtObject = new _qmf::Vhost(agent, this, parentBroker, "/"); + mgmtObject = _qmf::Vhost::shared_ptr(new _qmf::Vhost(agent, this, parentBroker, "/")); agent->addObject(mgmtObject, 0, true); } } diff --git a/qpid/cpp/src/qpid/broker/Vhost.h b/qpid/cpp/src/qpid/broker/Vhost.h index 9554d641c2..599b821870 100644 --- a/qpid/cpp/src/qpid/broker/Vhost.h +++ b/qpid/cpp/src/qpid/broker/Vhost.h @@ -32,7 +32,7 @@ class Vhost : public management::Manageable { private: - qmf::org::apache::qpid::broker::Vhost* mgmtObject; + qmf::org::apache::qpid::broker::Vhost::shared_ptr mgmtObject; public: @@ -40,7 +40,7 @@ class Vhost : public management::Manageable Vhost (management::Manageable* parentBroker, Broker* broker = 0); - management::ManagementObject* GetManagementObject (void) const + management::ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } void setFederationTag(const std::string& tag); }; diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 877fa021d0..c3d0598249 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -64,7 +64,6 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) systemId(broker.getSystem()->getSystemId().data()), settings(s), observer(new ConnectionObserver(*this, systemId)), - mgmtObject(0), status(STANDALONE), membership(systemId), replicationTest(s.replicateDefault.get()) @@ -95,7 +94,7 @@ void HaBroker::initialize() { if (settings.cluster && !ma) throw Exception("Cannot start HA: management is disabled"); _qmf::Package packageInit(ma); - mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker"); + mgmtObject = _qmf::HaBroker::shared_ptr(new _qmf::HaBroker(ma, this, "ha-broker")); mgmtObject->set_replicateDefault(settings.replicateDefault.str()); mgmtObject->set_systemId(systemId); ma->addObject(mgmtObject); diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 48db0a8d3c..530211ced4 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -71,7 +71,7 @@ class HaBroker : public management::Manageable void initialize(); // Implement Manageable. - qpid::management::ManagementObject* GetManagementObject() const { return mgmtObject; } + qpid::management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObject; } management::Manageable::status_t ManagementMethod ( uint32_t methodId, management::Args& args, std::string& text); @@ -124,7 +124,7 @@ class HaBroker : public management::Manageable boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary boost::shared_ptr<Backup> backup; boost::shared_ptr<Primary> primary; - qmf::org::apache::qpid::ha::HaBroker* mgmtObject; + qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject; Url clientUrl, brokerUrl; std::vector<Url> knownBrokers; BrokerStatus status; diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index ce4e545c80..b933c71bbb 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -91,9 +91,8 @@ std::ostream& operator<<(std::ostream& o, const QueueSetPrinter& qp) { void RemoteBackup::ready(const QueuePtr& q) { catchupQueues.erase(q); - QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName() - << QueueSetPrinter(", waiting for: ", catchupQueues)); - if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready"); + QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() << ", " + << catchupQueues.size() << " remain to catch up"); } // Called via ConfigurationObserver::queueCreate and from catchupQueue diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.cpp b/qpid/cpp/src/qpid/ha/StatusCheck.cpp index 01fceb7783..17613ce3dd 100644 --- a/qpid/cpp/src/qpid/ha/StatusCheck.cpp +++ b/qpid/cpp/src/qpid/ha/StatusCheck.cpp @@ -87,8 +87,8 @@ void StatusCheckThread::run() { string status = details["status"].getString(); if (status != "joining") { statusCheck.setPromote(false); - QPID_LOG(error, statusCheck.logPrefix << "Broker " << url << " status is " << status - << " this broker will refuse promotion."); + QPID_LOG(info, statusCheck.logPrefix << "Status of " << url << " is " + << status << ", this broker will refuse promotion."); } QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status); } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 474c86ed48..8e19b68284 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -62,6 +62,7 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace { + const size_t qmfV1BufferSize(65536); const string defaultVendorName("vendor"); const string defaultProductName("product"); @@ -113,9 +114,8 @@ ManagementAgent::RemoteAgent::~RemoteAgent () QPID_LOG(debug, "Remote Agent removed bank=[" << brokerBank << "." << agentBank << "]"); if (mgmtObject != 0) { mgmtObject->resourceDestroy(); - agent.deleteObjectNowLH(mgmtObject->getObjectId()); - delete mgmtObject; - mgmtObject = 0; + agent.deleteObjectNow(mgmtObject->getObjectId()); + mgmtObject.reset(); } } @@ -124,8 +124,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : startTime(sys::now()), suppressed(false), disallowAllV1Methods(false), vendorNameKey(defaultVendorName), productNameKey(defaultProductName), - qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100), - msgBuffer(MA_BUFFER_SIZE), memstat(0) + qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100) { nextObjectId = 1; brokerBank = 1; @@ -136,7 +135,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : attrMap["_vendor"] = defaultVendorName; attrMap["_product"] = defaultProductName; - memstat = new qmf::org::apache::qpid::broker::Memory(this, 0, "amqp-broker"); + memstat = _qmf::Memory::shared_ptr(new qmf::org::apache::qpid::broker::Memory(this, 0, "amqp-broker")); addObject(memstat, "amqp-broker"); } @@ -155,15 +154,6 @@ ManagementAgent::~ManagementAgent () v2Direct.reset(); remoteAgents.clear(); - - moveNewObjectsLH(); - for (ManagementObjectMap::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) { - ManagementObject* object = iter->second; - delete object; - } - managementObjects.clear(); } } @@ -316,11 +306,12 @@ void ManagementAgent::registerEvent (const string& packageName, } // Deprecated: V1 objects -ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId, bool persistent) +ObjectId ManagementAgent::addObject(ManagementObject::shared_ptr object, uint64_t persistId, bool persistent) { uint16_t sequence; uint64_t objectNum; + sys::Mutex::ScopedLock lock(addLock); sequence = persistent ? 0 : bootSequence; objectNum = persistId ? persistId : nextObjectId++; @@ -329,17 +320,14 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId object->setObjectId(objId); - { - sys::Mutex::ScopedLock lock(addLock); - newManagementObjects.push_back(object); - } + newManagementObjects.push_back(object); QPID_LOG(debug, "Management object (V1) added: " << objId.getV2Key()); return objId; } -ObjectId ManagementAgent::addObject(ManagementObject* object, +ObjectId ManagementAgent::addObject(ManagementObject::shared_ptr object, const string& key, bool persistent) { @@ -369,12 +357,11 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi "emerg", "alert", "crit", "error", "warn", "note", "info", "debug" }; - sys::Mutex::ScopedLock lock (userLock); uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; if (qmf1Support) { - Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + char buffer[qmfV1BufferSize]; + Buffer outBuffer(buffer, qmfV1BufferSize); encodeHeader(outBuffer, 'e'); outBuffer.putShortString(event.getPackageName()); @@ -385,9 +372,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi string sBuf; event.encode(sBuf); outBuffer.putRawData(sBuf); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, mExchange, + sendBuffer(outBuffer, mExchange, "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); QPID_LOG(debug, "SEND raiseEvent (v1) class=" << event.getPackageName() << "." << event.getEventName()); } @@ -426,7 +411,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi Variant::List list_; list_.push_back(map_); ListCodec::encode(list_, content); - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); + sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str()); QPID_LOG(debug, "SEND raiseEvent (v2) class=" << event.getPackageName() << "." << event.getEventName()); } } @@ -480,12 +465,9 @@ void ManagementAgent::clientAdded (const string& routingKey) while (rkeys.size()) { char localBuffer[16]; Buffer outBuffer(localBuffer, 16); - uint32_t outLen; encodeHeader(outBuffer, 'x'); - outLen = outBuffer.getPosition(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, rkeys.front()); + sendBuffer(outBuffer, dExchange, rkeys.front()); QPID_LOG(debug, "SEND ConsoleAddedIndication to=" << rkeys.front()); rkeys.pop_front(); } @@ -496,8 +478,8 @@ void ManagementAgent::clusterUpdate() { // Set clientWasAdded so that on the next periodicProcessing we will do // a full update on all cluster members. sys::Mutex::ScopedLock l(userLock); - moveNewObjectsLH(); // keep lists consistent with updater/updatee. - moveDeletedObjectsLH(); + moveNewObjects(); // keep lists consistent with updater/updatee. + moveDeletedObjects(); clientWasAdded = true; debugSnapshot("Cluster member joined"); } @@ -523,12 +505,9 @@ bool ManagementAgent::checkHeader (Buffer& buf, uint8_t *opcode, uint32_t *seq) return h1 == 'A' && h2 == 'M' && h3 == '2'; } -// NOTE WELL: assumes userLock is held by caller (LH) -// NOTE EVEN WELLER: drops this lock when delivering the message!!! -void ManagementAgent::sendBufferLH(Buffer& buf, - uint32_t length, - qpid::broker::Exchange::shared_ptr exchange, - const string& routingKey) +void ManagementAgent::sendBuffer(Buffer& buf, + qpid::broker::Exchange::shared_ptr exchange, + const string& routingKey) { if (suppressed) { QPID_LOG(debug, "Suppressing management message to " << routingKey); @@ -541,6 +520,8 @@ void ManagementAgent::sendBufferLH(Buffer& buf, AMQFrame header((AMQHeaderBody())); AMQFrame content((AMQContentBody())); + size_t length = buf.getPosition(); + buf.reset(); content.castBody<AMQContentBody>()->decode(buf, length); method.setEof(false); @@ -564,38 +545,31 @@ void ManagementAgent::sendBufferLH(Buffer& buf, Message msg(transfer, transfer); msg.setIsManagementMessage(true); - { - sys::Mutex::ScopedUnlock u(userLock); - - DeliverableMessage deliverable (msg, 0); - try { - exchange->route(deliverable); - } catch(exception&) {} - } + DeliverableMessage deliverable (msg, 0); + try { + exchange->route(deliverable); + } catch(exception&) {} buf.reset(); } -void ManagementAgent::sendBufferLH(Buffer& buf, - uint32_t length, - const string& exchange, - const string& routingKey) +void ManagementAgent::sendBuffer(Buffer& buf, + const string& exchange, + const string& routingKey) { qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange)); if (ex.get() != 0) - sendBufferLH(buf, length, ex, routingKey); + sendBuffer(buf, ex, routingKey); } -// NOTE WELL: assumes userLock is held by caller (LH) -// NOTE EVEN WELLER: drops this lock when delivering the message!!! -void ManagementAgent::sendBufferLH(const string& data, - const string& cid, - const Variant::Map& headers, - const string& content_type, - qpid::broker::Exchange::shared_ptr exchange, - const string& routingKey, - uint64_t ttl_msec) +void ManagementAgent::sendBuffer(const string& data, + const string& cid, + const Variant::Map& headers, + const string& content_type, + qpid::broker::Exchange::shared_ptr exchange, + const string& routingKey, + uint64_t ttl_msec) { Variant::Map::const_iterator i; @@ -643,34 +617,30 @@ void ManagementAgent::sendBufferLH(const string& data, msg.setIsManagementMessage(true); msg.computeExpiration(broker->getExpiryPolicy()); - { - sys::Mutex::ScopedUnlock u(userLock); - - DeliverableMessage deliverable (msg, 0); - try { - exchange->route(deliverable); - } catch(exception&) {} - } + DeliverableMessage deliverable (msg,0); + try { + exchange->route(deliverable); + } catch(exception&) {} } -void ManagementAgent::sendBufferLH(const string& data, - const string& cid, - const Variant::Map& headers, - const string& content_type, - const string& exchange, - const string& routingKey, - uint64_t ttl_msec) +void ManagementAgent::sendBuffer(const string& data, + const string& cid, + const Variant::Map& headers, + const string& content_type, + const string& exchange, + const string& routingKey, + uint64_t ttl_msec) { qpid::broker::Exchange::shared_ptr ex(broker->getExchanges().get(exchange)); if (ex.get() != 0) - sendBufferLH(data, cid, headers, content_type, ex, routingKey, ttl_msec); + sendBuffer(data, cid, headers, content_type, ex, routingKey, ttl_msec); } /** Objects that have been added since the last periodic poll are temporarily * saved in the newManagementObjects list. This allows objects to be - * added without needing to block on the userLock (addLock is used instead). + * added without needing to block on the userLock (objectLock is used instead). * These new objects need to be integrated into the object database * (managementObjects) *before* they can be properly managed. This routine * performs the integration. @@ -680,34 +650,33 @@ void ManagementAgent::sendBufferLH(const string& data, * duplicate object ids. To avoid clashes, don't put deleted objects * into the active object database. */ -void ManagementAgent::moveNewObjectsLH() +void ManagementAgent::moveNewObjects() { - sys::Mutex::ScopedLock lock (addLock); + sys::Mutex::ScopedLock lock(addLock); + sys::Mutex::ScopedLock objLock (objectLock); while (!newManagementObjects.empty()) { - ManagementObject *object = newManagementObjects.back(); + ManagementObject::shared_ptr object = newManagementObjects.back(); newManagementObjects.pop_back(); if (object->isDeleted()) { DeletedObject::shared_ptr dptr(new DeletedObject(object, qmf1Support, qmf2Support)); pendingDeletedObjs[dptr->getKey()].push_back(dptr); - delete object; } else { // add to active object list, check for duplicates. ObjectId oid = object->getObjectId(); ManagementObjectMap::iterator destIter = managementObjects.find(oid); if (destIter != managementObjects.end()) { // duplicate found. It is OK if the old object has been marked // deleted, just replace the old with the new. - ManagementObject *oldObj = destIter->second; - if (oldObj->isDeleted()) { - DeletedObject::shared_ptr dptr(new DeletedObject(oldObj, qmf1Support, qmf2Support)); - pendingDeletedObjs[dptr->getKey()].push_back(dptr); - delete oldObj; - } else { + ManagementObject::shared_ptr oldObj = destIter->second; + if (!oldObj->isDeleted()) { // Duplicate non-deleted objects? This is a user error - oids must be unique. // for now, leak the old object (safer than deleting - may still be referenced) // and complain loudly... QPID_LOG(error, "Detected two management objects with the same identifier: " << oid); + oldObj->resourceDestroy(); } + DeletedObject::shared_ptr dptr(new DeletedObject(oldObj, qmf1Support, qmf2Support)); + pendingDeletedObjs[dptr->getKey()].push_back(dptr); // QPID-3666: be sure to replace the -index- also, as non-key members of // the index object may be different for the new object! So erase the // entry, rather than []= assign here: @@ -720,32 +689,41 @@ void ManagementAgent::moveNewObjectsLH() void ManagementAgent::periodicProcessing (void) { -#define BUFSIZE 65536 #define HEADROOM 4096 debugSnapshot("Management agent periodic processing"); sys::Mutex::ScopedLock lock (userLock); - uint32_t contentSize; string routingKey; string sBuf; - moveNewObjectsLH(); + moveNewObjects(); // // If we're publishing updates, get the latest memory statistics and uptime now // if (publish) { uint64_t uptime = sys::Duration(startTime, sys::now()); - static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); - qpid::sys::MemStat::loadMemInfo(memstat); + boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); + qpid::sys::MemStat::loadMemInfo(memstat.get()); + } + + // + // Use a copy of the management object map to avoid holding the objectLock + // + ManagementObjectVector localManagementObjects; + { + sys::Mutex::ScopedLock objLock(objectLock); + std::transform(managementObjects.begin(), managementObjects.end(), + std::back_inserter(localManagementObjects), + boost::bind(&ManagementObjectMap::value_type::second, _1)); } // // Clear the been-here flag on all objects in the map. // - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); + for (ManagementObjectVector::iterator iter = localManagementObjects.begin(); + iter != localManagementObjects.end(); iter++) { - ManagementObject* object = iter->second; + ManagementObject::shared_ptr object = *iter; object->setFlags(0); if (clientWasAdded) { object->setForcePublish(true); @@ -760,22 +738,25 @@ void ManagementAgent::periodicProcessing (void) // if we sent the active update first, _then_ the delete update, clients // would incorrectly think the object was deleted. See QPID-2997 // - bool objectsDeleted = moveDeletedObjectsLH(); + bool objectsDeleted = moveDeletedObjects(); + PendingDeletedObjsMap localPendingDeletedObjs; + { + sys::Mutex::ScopedLock objLock(objectLock); + localPendingDeletedObjs.swap(pendingDeletedObjs); + } // // If we are not publishing updates, just clear the pending deletes. There's no // need to tell anybody. // if (!publish) - pendingDeletedObjs.clear(); - - if (!pendingDeletedObjs.empty()) { - // use a temporary copy of the pending deletes so dropping the lock when - // the buffer is sent is safe. - PendingDeletedObjsMap tmp(pendingDeletedObjs); - pendingDeletedObjs.clear(); + localPendingDeletedObjs.clear(); - for (PendingDeletedObjsMap::iterator mIter = tmp.begin(); mIter != tmp.end(); mIter++) { + ResizableBuffer msgBuffer(qmfV1BufferSize); + if (!localPendingDeletedObjs.empty()) { + for (PendingDeletedObjsMap::iterator mIter = localPendingDeletedObjs.begin(); + mIter != localPendingDeletedObjs.end(); + mIter++) { std::string packageName; std::string className; msgBuffer.reset(); @@ -807,11 +788,10 @@ void ManagementAgent::periodicProcessing (void) } if (v1Objs >= maxReplyObjs) { v1Objs = 0; - contentSize = msgBuffer.getSize(); stringstream key; key << "console.obj.1.0." << packageName << "." << className; - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK + size_t contentSize = msgBuffer.getPosition(); + sendBuffer(msgBuffer, mExchange, key.str()); QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); } @@ -840,7 +820,7 @@ void ManagementAgent::periodicProcessing (void) headers["qmf.content"] = "_data"; headers["qmf.agent"] = name_address; - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK + sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0); QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); } } @@ -850,11 +830,10 @@ void ManagementAgent::periodicProcessing (void) // send any remaining objects... if (v1Objs) { - contentSize = BUFSIZE - msgBuffer.available(); stringstream key; key << "console.obj.1.0." << packageName << "." << className; - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK + size_t contentSize = msgBuffer.getPosition(); + sendBuffer(msgBuffer, mExchange, key.str()); QPID_LOG(debug, "SEND V1 Multicast ContentInd V1 (delete) to=" << key.str() << " len=" << contentSize); } @@ -877,7 +856,7 @@ void ManagementAgent::periodicProcessing (void) headers["qmf.content"] = "_data"; headers["qmf.agent"] = name_address; - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK + sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0); QPID_LOG(debug, "SEND Multicast ContentInd V2 (delete) to=" << key.str() << " len=" << content.length()); } } @@ -885,9 +864,7 @@ void ManagementAgent::periodicProcessing (void) } // - // Process the entire object map. Remember: we drop the userLock each time we call - // sendBuffer(). This allows the managementObjects map to be altered during the - // sendBuffer() call, so always restart the search after a sendBuffer() call + // Process the entire object map. // // If publish is disabled, don't send any updates. // @@ -897,14 +874,14 @@ void ManagementAgent::periodicProcessing (void) uint32_t pcount; uint32_t scount; uint32_t v1Objs, v2Objs; - ManagementObjectMap::iterator baseIter; + ManagementObjectVector::iterator baseIter; std::string packageName; std::string className; - for (baseIter = managementObjects.begin(); - baseIter != managementObjects.end(); + for (baseIter = localManagementObjects.begin(); + baseIter != localManagementObjects.end(); baseIter++) { - ManagementObject* baseObject = baseIter->second; + ManagementObject::shared_ptr baseObject = *baseIter; // // Skip until we find a base object requiring processing... // @@ -915,7 +892,7 @@ void ManagementAgent::periodicProcessing (void) } } - if (baseIter == managementObjects.end()) + if (baseIter == localManagementObjects.end()) break; // done - all objects processed pcount = scount = 0; @@ -924,12 +901,12 @@ void ManagementAgent::periodicProcessing (void) list_.clear(); msgBuffer.reset(); - for (ManagementObjectMap::iterator iter = baseIter; - iter != managementObjects.end(); + for (ManagementObjectVector::iterator iter = baseIter; + iter != localManagementObjects.end(); iter++) { msgBuffer.makeAvailable(HEADROOM); // Make sure there's buffer space - ManagementObject* baseObject = baseIter->second; - ManagementObject* object = iter->second; + ManagementObject::shared_ptr baseObject = *baseIter; + ManagementObject::shared_ptr object = *iter; bool send_stats, send_props; if (baseObject->isSameClass(*object) && object->getFlags() == 0) { object->setFlags(1); @@ -1004,12 +981,11 @@ void ManagementAgent::periodicProcessing (void) if (pcount || scount) { if (qmf1Support) { - contentSize = BUFSIZE - msgBuffer.available(); - if (contentSize > 0) { + if (msgBuffer.getPosition() > 0) { stringstream key; key << "console.obj.1.0." << packageName << "." << className; - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, key.str()); // UNLOCKS USERLOCK + size_t contentSize = msgBuffer.getPosition(); + sendBuffer(msgBuffer, mExchange, key.str()); QPID_LOG(debug, "SEND V1 Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount @@ -1035,7 +1011,7 @@ void ManagementAgent::periodicProcessing (void) headers["qmf.content"] = "_data"; headers["qmf.agent"] = name_address; - sendBufferLH(content, "", headers, "amqp/list", v2Topic, key.str()); // UNLOCKS USERLOCK + sendBuffer(content, "", headers, "amqp/list", v2Topic, key.str(), 0); QPID_LOG(debug, "SEND Multicast ContentInd to=" << key.str() << " props=" << pcount << " stats=" << scount @@ -1045,21 +1021,21 @@ void ManagementAgent::periodicProcessing (void) } } // end processing updates for all objects - if (objectsDeleted) deleteOrphanedAgentsLH(); + if (objectsDeleted) { + sys::Mutex::ScopedLock lock (userLock); + deleteOrphanedAgentsLH(); + } // heartbeat generation. Note that heartbeats need to be sent even if publish is disabled. if (qmf1Support) { - uint32_t contentSize; - char msgChars[BUFSIZE]; - Buffer msgBuffer(msgChars, BUFSIZE); + char msgChars[qmfV1BufferSize]; + Buffer msgBuffer(msgChars, qmfV1BufferSize); encodeHeader(msgBuffer, 'h'); msgBuffer.putLongLong(uint64_t(sys::Duration(sys::EPOCH, sys::now()))); - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); routingKey = "console.heartbeat.1.0"; - sendBufferLH(msgBuffer, contentSize, mExchange, routingKey); + sendBuffer(msgBuffer, mExchange, routingKey); QPID_LOG(debug, "SEND HeartbeatInd to=" << routingKey); } @@ -1087,23 +1063,26 @@ void ManagementAgent::periodicProcessing (void) // Set TTL (in msecs) on outgoing heartbeat indications based on the interval // time to prevent stale heartbeats from getting to the consoles. - sendBufferLH(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000); + sendBuffer(content, "", headers, "amqp/map", v2Topic, addr_key.str(), interval * 2 * 1000); QPID_LOG(debug, "SENT AgentHeartbeat name=" << name_address); } } -void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) +void ManagementAgent::deleteObjectNow(const ObjectId& oid) { - ManagementObjectMap::iterator iter = managementObjects.find(oid); - if (iter == managementObjects.end()) - return; - ManagementObject* object = iter->second; - if (!object->isDeleted()) - return; + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock(objectLock); + ManagementObjectMap::iterator iter = managementObjects.find(oid); + if (iter == managementObjects.end()) + return; + object = iter->second; + if (!object->isDeleted()) + return; + managementObjects.erase(oid); + } - // since sendBufferLH drops the userLock, don't call it until we - // are done manipulating the object. #define DNOW_BUFSIZE 2048 char msgChars[DNOW_BUFSIZE]; Buffer msgBuffer(msgChars, DNOW_BUFSIZE); @@ -1139,15 +1118,12 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) v2key << "." << instanceNameKey; } - object = 0; - managementObjects.erase(oid); + object.reset(); // object deleted, ok to drop lock now. if (publish && qmf1Support) { - uint32_t contentSize = msgBuffer.getPosition(); - msgBuffer.reset(); - sendBufferLH(msgBuffer, contentSize, mExchange, v1key.str()); + sendBuffer(msgBuffer, mExchange, v1key.str()); QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str()); } @@ -1160,29 +1136,26 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) string content; ListCodec::encode(list_, content); - sendBufferLH(content, "", headers, "amqp/list", v2Topic, v2key.str()); + sendBuffer(content, "", headers, "amqp/list", v2Topic, v2key.str(), 0); QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v2key.str()); } } -void ManagementAgent::sendCommandCompleteLH(const string& replyToKey, uint32_t sequence, - uint32_t code, const string& text) +void ManagementAgent::sendCommandComplete(const string& replyToKey, uint32_t sequence, + uint32_t code, const string& text) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); encodeHeader (outBuffer, 'z', sequence); outBuffer.putLong (code); outBuffer.putShortString (text); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND CommandCompleteInd code=" << code << " text=" << text << " to=" << replyToKey << " seq=" << sequence); } -void ManagementAgent::sendExceptionLH(const string& rte, const string& rtk, const string& cid, - const string& text, uint32_t code, bool viaLocal) +void ManagementAgent::sendException(const string& rte, const string& rtk, const string& cid, + const string& text, uint32_t code, bool viaLocal) { static const string addr_exchange("qmf.default.direct"); @@ -1200,7 +1173,7 @@ void ManagementAgent::sendExceptionLH(const string& rte, const string& rtk, cons map["_values"] = values; MapCodec::encode(map, content); - sendBufferLH(content, cid, headers, "amqp/map", rte, rtk); + sendBuffer(content, cid, headers, "amqp/map", rte, rtk); QPID_LOG(debug, "SENT Exception code=" << code <<" text=" << text); } @@ -1211,7 +1184,6 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, const bool topic, int qmfVersion) { - sys::Mutex::ScopedLock lock (userLock); Message& msg = ((DeliverableMessage&) deliverable).getMessage (); if (topic && qmfVersion == 1) { @@ -1225,23 +1197,23 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, // schema.# if (routingKey == "broker") { - dispatchAgentCommandLH(msg); + dispatchAgentCommand(msg); return false; } if (routingKey.length() > 6) { if (routingKey.compare(0, 9, "agent.1.0") == 0) { - dispatchAgentCommandLH(msg); + dispatchAgentCommand(msg); return false; } if (routingKey.compare(0, 8, "agent.1.") == 0) { - return authorizeAgentMessageLH(msg); + return authorizeAgentMessage(msg); } if (routingKey.compare(0, 7, "schema.") == 0) { - dispatchAgentCommandLH(msg); + dispatchAgentCommand(msg); return true; } } @@ -1253,7 +1225,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, // Intercept messages bound to: // "console.ind.locate.# - process these messages, and also allow them to be forwarded. if (routingKey == "console.request.agent_locate") { - dispatchAgentCommandLH(msg); + dispatchAgentCommand(msg); return true; } @@ -1264,7 +1236,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, // "<name_address>" - the broker agent's proper name // and do not forward them futher if (routingKey == "broker" || routingKey == name_address) { - dispatchAgentCommandLH(msg, routingKey == "broker"); + dispatchAgentCommand(msg, routingKey == "broker"); return false; } } @@ -1273,16 +1245,15 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, return true; } -void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken) +void ManagementAgent::handleMethodRequest(Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken) { - moveNewObjectsLH(); + moveNewObjects(); string methodName; string packageName; string className; uint8_t hash[16]; - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); AclModule* acl = broker->getAcl(); string inArgs; @@ -1304,9 +1275,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl if (disallowAllV1Methods) { outBuffer.putLong(Manageable::STATUS_FORBIDDEN); outBuffer.putMediumString("QMFv1 methods forbidden on this broker, use QMFv2"); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN reason='All QMFv1 Methods Forbidden' seq=" << sequence); return; } @@ -1315,9 +1284,7 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl if (i != disallowed.end()) { outBuffer.putLong(Manageable::STATUS_FORBIDDEN); outBuffer.putMediumString(i->second); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN text=" << i->second << " seq=" << sequence); return; } @@ -1331,30 +1298,34 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { outBuffer.putLong(Manageable::STATUS_FORBIDDEN); outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); return; } } - ManagementObjectMap::iterator iter = numericFind(objId); - if (iter == managementObjects.end() || iter->second->isDeleted()) { + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock(objectLock); + ManagementObjectMap::iterator iter = numericFind(objId); + if (iter != managementObjects.end()) + object = iter->second; + } + + if (!object || object->isDeleted()) { outBuffer.putLong (Manageable::STATUS_UNKNOWN_OBJECT); outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_UNKNOWN_OBJECT)); } else { - if ((iter->second->getPackageName() != packageName) || - (iter->second->getClassName() != className)) { + if ((object->getPackageName() != packageName) || + (object->getClassName() != className)) { outBuffer.putLong (Manageable::STATUS_PARAMETER_INVALID); outBuffer.putMediumString(Manageable::StatusText (Manageable::STATUS_PARAMETER_INVALID)); } else { uint32_t pos = outBuffer.getPosition(); try { - sys::Mutex::ScopedUnlock u(userLock); string outBuf; - iter->second->doMethod(methodName, inArgs, outBuf, userId); + object->doMethod(methodName, inArgs, outBuf, userId); outBuffer.putRawData(outBuf); } catch(exception& e) { outBuffer.setPosition(pos);; @@ -1364,17 +1335,15 @@ void ManagementAgent::handleMethodRequestLH(Buffer& inBuffer, const string& repl } } - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND MethodResponse (v1) to=" << replyToKey << " seq=" << sequence); } -void ManagementAgent::handleMethodRequestLH (const string& body, const string& rte, const string& rtk, - const string& cid, const ConnectionToken* connToken, bool viaLocal) +void ManagementAgent::handleMethodRequest (const string& body, const string& rte, const string& rtk, + const string& cid, const ConnectionToken* connToken, bool viaLocal) { - moveNewObjectsLH(); + moveNewObjects(); string methodName; Variant::Map inMap; @@ -1393,8 +1362,8 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r if ((oid = inMap.find("_object_id")) == inMap.end() || (mid = inMap.find("_method_name")) == inMap.end()) { - sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID), - Manageable::STATUS_PARAMETER_INVALID, viaLocal); + sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID), + Manageable::STATUS_PARAMETER_INVALID, viaLocal); return; } @@ -1412,16 +1381,22 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r inArgs = (mid->second).asMap(); } } catch(exception& e) { - sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); + sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); return; } - ManagementObjectMap::iterator iter = managementObjects.find(objId); + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock(objectLock); + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter != managementObjects.end()) + object = iter->second; + } - if (iter == managementObjects.end() || iter->second->isDeleted()) { + if (!object || object->isDeleted()) { stringstream estr; estr << "No object found with ID=" << objId; - sendExceptionLH(rte, rtk, cid, estr.str(), 1, viaLocal); + sendException(rte, rtk, cid, estr.str(), 1, viaLocal); return; } @@ -1429,20 +1404,20 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r AclModule* acl = broker->getAcl(); DisallowedMethods::const_iterator i; - i = disallowed.find(make_pair(iter->second->getClassName(), methodName)); + i = disallowed.find(make_pair(object->getClassName(), methodName)); if (i != disallowed.end()) { - sendExceptionLH(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal); + sendException(rte, rtk, cid, i->second, Manageable::STATUS_FORBIDDEN, viaLocal); return; } string userId = ((const qpid::broker::ConnectionState*) connToken)->getUserId(); if (acl != 0) { map<acl::Property, string> params; - params[acl::PROP_SCHEMAPACKAGE] = iter->second->getPackageName(); - params[acl::PROP_SCHEMACLASS] = iter->second->getClassName(); + params[acl::PROP_SCHEMAPACKAGE] = object->getPackageName(); + params[acl::PROP_SCHEMACLASS] = object->getClassName(); if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_METHOD, methodName, ¶ms)) { - sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), + sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), Manageable::STATUS_FORBIDDEN, viaLocal); return; } @@ -1450,13 +1425,12 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r // invoke the method - QPID_LOG(debug, "RECV MethodRequest (v2) class=" << iter->second->getPackageName() - << ":" << iter->second->getClassName() << " method=" << + QPID_LOG(debug, "RECV MethodRequest (v2) class=" << object->getPackageName() + << ":" << object->getClassName() << " method=" << methodName << " replyTo=" << rte << "/" << rtk << " objId=" << objId << " inArgs=" << inArgs); try { - sys::Mutex::ScopedUnlock u(userLock); - iter->second->doMethod(methodName, inArgs, callMap, userId); + object->doMethod(methodName, inArgs, callMap, userId); errorCode = callMap["_status_code"].asUint32(); if (errorCode == 0) { outMap["_arguments"] = Variant::Map(); @@ -1467,62 +1441,59 @@ void ManagementAgent::handleMethodRequestLH (const string& body, const string& r } else error = callMap["_status_text"].asString(); } catch(exception& e) { - sendExceptionLH(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); + sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION, viaLocal); return; } if (errorCode != 0) { - sendExceptionLH(rte, rtk, cid, error, errorCode, viaLocal); + sendException(rte, rtk, cid, error, errorCode, viaLocal); return; } MapCodec::encode(outMap, content); - sendBufferLH(content, cid, headers, "amqp/map", rte, rtk); + sendBuffer(content, cid, headers, "amqp/map", rte, rtk); QPID_LOG(debug, "SEND MethodResponse (v2) to=" << rte << "/" << rtk << " seq=" << cid << " map=" << outMap); } -void ManagementAgent::handleBrokerRequestLH (Buffer&, const string& replyToKey, uint32_t sequence) +void ManagementAgent::handleBrokerRequest (Buffer&, const string& replyToKey, uint32_t sequence) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); QPID_LOG(debug, "RECV BrokerRequest replyTo=" << replyToKey); encodeHeader (outBuffer, 'b', sequence); uuid.encode (outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND BrokerResponse to=" << replyToKey); } -void ManagementAgent::handlePackageQueryLH (Buffer&, const string& replyToKey, uint32_t sequence) +void ManagementAgent::handlePackageQuery (Buffer&, const string& replyToKey, uint32_t sequence) { QPID_LOG(debug, "RECV PackageQuery replyTo=" << replyToKey); - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); - for (PackageMap::iterator pIter = packages.begin (); - pIter != packages.end (); - pIter++) { - encodeHeader (outBuffer, 'p', sequence); - encodePackageIndication (outBuffer, pIter); + sys::Mutex::ScopedLock lock(userLock); + for (PackageMap::iterator pIter = packages.begin (); + pIter != packages.end (); + pIter++) + { + encodeHeader (outBuffer, 'p', sequence); + encodePackageIndication (outBuffer, pIter); + } } - outLen = MA_BUFFER_SIZE - outBuffer.available (); - if (outLen) { - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + if (outBuffer.getPosition() > 0) { + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND PackageInd to=" << replyToKey << " seq=" << sequence); } - sendCommandCompleteLH(replyToKey, sequence); + sendCommandComplete(replyToKey, sequence); } -void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, const string& replyToKey, uint32_t sequence) +void ManagementAgent::handlePackageInd (Buffer& inBuffer, const string& replyToKey, uint32_t sequence) { string packageName; @@ -1530,10 +1501,11 @@ void ManagementAgent::handlePackageIndLH (Buffer& inBuffer, const string& replyT QPID_LOG(debug, "RECV PackageInd package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); + sys::Mutex::ScopedLock lock(userLock); findOrAddPackageLH(packageName); } -void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence) +void ManagementAgent::handleClassQuery(Buffer& inBuffer, const string& replyToKey, uint32_t sequence) { string packageName; @@ -1541,40 +1513,39 @@ void ManagementAgent::handleClassQueryLH(Buffer& inBuffer, const string& replyTo QPID_LOG(debug, "RECV ClassQuery package=" << packageName << " replyTo=" << replyToKey << " seq=" << sequence); - PackageMap::iterator pIter = packages.find(packageName); - if (pIter != packages.end()) + typedef std::pair<SchemaClassKey, uint8_t> _ckeyType; + std::list<_ckeyType> classes; { - typedef std::pair<SchemaClassKey, uint8_t> _ckeyType; - std::list<_ckeyType> classes; - ClassMap &cMap = pIter->second; - for (ClassMap::iterator cIter = cMap.begin(); - cIter != cMap.end(); - cIter++) { - if (cIter->second.hasSchema()) { - classes.push_back(make_pair(cIter->first, cIter->second.kind)); + sys::Mutex::ScopedLock lock(userLock); + PackageMap::iterator pIter = packages.find(packageName); + if (pIter != packages.end()) + { + ClassMap &cMap = pIter->second; + for (ClassMap::iterator cIter = cMap.begin(); + cIter != cMap.end(); + cIter++) { + if (cIter->second.hasSchema()) { + classes.push_back(make_pair(cIter->first, cIter->second.kind)); + } } } + } - while (classes.size()) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; - - encodeHeader(outBuffer, 'q', sequence); - encodeClassIndication(outBuffer, packageName, classes.front().first, classes.front().second); + while (classes.size()) { + ResizableBuffer outBuffer(qmfV1BufferSize); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); - QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name << - "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence); - classes.pop_front(); - } + encodeHeader(outBuffer, 'q', sequence); + encodeClassIndication(outBuffer, packageName, classes.front().first, classes.front().second); + sendBuffer(outBuffer, dExchange, replyToKey); + QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << classes.front().first.name << + "(" << Uuid(classes.front().first.hash) << ") to=" << replyToKey << " seq=" << sequence); + classes.pop_front(); } - sendCommandCompleteLH(replyToKey, sequence); + sendCommandComplete(replyToKey, sequence); } -void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToKey, uint32_t) +void ManagementAgent::handleClassInd (Buffer& inBuffer, const string& replyToKey, uint32_t) { string packageName; SchemaClassKey key; @@ -1587,20 +1558,18 @@ void ManagementAgent::handleClassIndLH (Buffer& inBuffer, const string& replyToK QPID_LOG(debug, "RECV ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), replyTo=" << replyToKey); + sys::Mutex::ScopedLock lock(userLock); PackageMap::iterator pIter = findOrAddPackageLH(packageName); ClassMap::iterator cIter = pIter->second.find(key); if (cIter == pIter->second.end() || !cIter->second.hasSchema()) { - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); uint32_t sequence = nextRequestSequence++; // Schema Request encodeHeader (outBuffer, 'S', sequence); outBuffer.putShortString(packageName); key.encode(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), to=" << replyToKey << " seq=" << sequence); @@ -1625,7 +1594,7 @@ void ManagementAgent::SchemaClass::appendSchema(Buffer& buf) buf.putRawData(reinterpret_cast<uint8_t*>(&data[0]), data.size()); } -void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& rte, const string& rtk, uint32_t sequence) +void ManagementAgent::handleSchemaRequest(Buffer& inBuffer, const string& rte, const string& rtk, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -1636,34 +1605,32 @@ void ManagementAgent::handleSchemaRequestLH(Buffer& inBuffer, const string& rte, QPID_LOG(debug, "RECV SchemaRequest class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << "), replyTo=" << rte << "/" << rtk << " seq=" << sequence); + sys::Mutex::ScopedLock lock(userLock); PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) { ClassMap& cMap = pIter->second; ClassMap::iterator cIter = cMap.find(key); if (cIter != cMap.end()) { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer(qmfV1BufferSize); SchemaClass& classInfo = cIter->second; if (classInfo.hasSchema()) { encodeHeader(outBuffer, 's', sequence); classInfo.appendSchema(outBuffer); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, rte, rtk); + sendBuffer(outBuffer, rte, rtk); QPID_LOG(debug, "SEND SchemaResponse to=" << rte << "/" << rtk << " seq=" << sequence); } else - sendCommandCompleteLH(rtk, sequence, 1, "Schema not available"); + sendCommandComplete(rtk, sequence, 1, "Schema not available"); } else - sendCommandCompleteLH(rtk, sequence, 1, "Class key not found"); + sendCommandComplete(rtk, sequence, 1, "Class key not found"); } else - sendCommandCompleteLH(rtk, sequence, 1, "Package not found"); + sendCommandComplete(rtk, sequence, 1, "Package not found"); } -void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*replyToKey*/, uint32_t sequence) +void ManagementAgent::handleSchemaResponse(Buffer& inBuffer, const string& /*replyToKey*/, uint32_t sequence) { string packageName; SchemaClassKey key; @@ -1676,6 +1643,7 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r QPID_LOG(debug, "RECV SchemaResponse class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " seq=" << sequence); + sys::Mutex::ScopedLock lock(userLock); PackageMap::iterator pIter = packages.find(packageName); if (pIter != packages.end()) { ClassMap& cMap = pIter->second; @@ -1690,14 +1658,11 @@ void ManagementAgent::handleSchemaResponseLH(Buffer& inBuffer, const string& /*r inBuffer.getRawData(reinterpret_cast<uint8_t*>(&cIter->second.data[0]), length); // Publish a class-indication message - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer(qmfV1BufferSize); encodeHeader(outBuffer, 'q'); encodeClassIndication(outBuffer, pIter->first, cIter->first, cIter->second.kind); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, mExchange, "schema.class"); + sendBuffer(outBuffer, mExchange, "schema.class"); QPID_LOG(debug, "SEND ClassInd class=" << packageName << ":" << key.name << "(" << Uuid(key.hash) << ")" << " to=schema.class"); } @@ -1756,7 +1721,7 @@ void ManagementAgent::deleteOrphanedAgentsLH() remoteAgents.erase(*dIter); } -void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken) +void ManagementAgent::handleAttachRequest (Buffer& inBuffer, const string& replyToKey, uint32_t sequence, const ConnectionToken* connToken) { string label; uint32_t requestedBrokerBank, requestedAgentBank; @@ -1764,12 +1729,14 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep ObjectId connectionRef = ((const ConnectionState*) connToken)->GetManagementObject()->getObjectId(); Uuid systemId; - moveNewObjectsLH(); + moveNewObjects(); + + sys::Mutex::ScopedLock lock(userLock); deleteOrphanedAgentsLH(); RemoteAgentMap::iterator aIter = remoteAgents.find(connectionRef); if (aIter != remoteAgents.end()) { // There already exists an agent on this session. Reject the request. - sendCommandCompleteLH(replyToKey, sequence, 1, "Connection already has remote agent"); + sendCommandComplete(replyToKey, sequence, 1, "Connection already has remote agent"); return; } @@ -1788,7 +1755,7 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep agent->agentBank = assignedBank; agent->routingKey = replyToKey; agent->connectionRef = connectionRef; - agent->mgmtObject = new _qmf::Agent (this, agent.get()); + agent->mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent (this, agent.get())); agent->mgmtObject->set_connectionRef(agent->connectionRef); agent->mgmtObject->set_label (label); agent->mgmtObject->set_registeredTo (broker->GetManagementObject()->getObjectId()); @@ -1801,25 +1768,22 @@ void ManagementAgent::handleAttachRequestLH (Buffer& inBuffer, const string& rep QPID_LOG(debug, "Remote Agent registered bank=[" << brokerBank << "." << assignedBank << "] replyTo=" << replyToKey); // Send an Attach Response - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); encodeHeader (outBuffer, 'a', sequence); outBuffer.putLong (brokerBank); outBuffer.putLong (assignedBank); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND AttachResponse brokerBank=" << brokerBank << " agentBank=" << assignedBank << " to=" << replyToKey << " seq=" << sequence); } -void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKey, uint32_t sequence) +void ManagementAgent::handleGetQuery(Buffer& inBuffer, const string& replyToKey, uint32_t sequence) { FieldTable ft; FieldTable::ValuePtr value; - moveNewObjectsLH(); + moveNewObjects(); ft.decode(inBuffer); @@ -1832,11 +1796,17 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe return; ObjectId selector(value->get<string>()); - ManagementObjectMap::iterator iter = numericFind(selector); - if (iter != managementObjects.end()) { - ManagementObject* object = iter->second; - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock(objectLock); + ManagementObjectMap::iterator iter = numericFind(selector); + if (iter != managementObjects.end()) + object = iter->second; + } + + if (object) { + ResizableBuffer outBuffer (qmfV1BufferSize); if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); @@ -1849,89 +1819,80 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe sBuf.clear(); object->writeStatistics(sBuf, true); outBuffer.putRawData(sBuf); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } } - sendCommandCompleteLH(replyToKey, sequence); + sendCommandComplete(replyToKey, sequence); return; } string className (value->get<string>()); - std::list<ObjectId>matches; + std::list<ManagementObject::shared_ptr> matches; if (className == "memory") - qpid::sys::MemStat::loadMemInfo(memstat); + qpid::sys::MemStat::loadMemInfo(memstat.get()); if (className == "broker") { uint64_t uptime = sys::Duration(startTime, sys::now()); - static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); + boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); } // build up a set of all objects to be dumped - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); - iter++) { - ManagementObject* object = iter->second; - if (object->getClassName () == className) { - matches.push_back(object->getObjectId()); + { + sys::Mutex::ScopedLock lock(objectLock); + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject::shared_ptr object = iter->second; + if (object->getClassName () == className) { + matches.push_back(object); + } } } - // send them (as sendBufferLH drops the userLock) - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + // send them + ResizableBuffer outBuffer (qmfV1BufferSize); while (matches.size()) { - ObjectId objId = matches.front(); - ManagementObjectMap::iterator oIter = managementObjects.find( objId ); - if (oIter != managementObjects.end()) { - ManagementObject* object = oIter->second; - - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); - - if (!object->isDeleted()) { - string sProps, sStats; - object->writeProperties(sProps); - object->writeStatistics(sStats, true); - - size_t len = 8 + sProps.length() + sStats.length(); // 8 == size of header in bytes. - if (len > MA_BUFFER_SIZE) { - QPID_LOG(error, "Object " << objId << " too large for output buffer - discarded!"); - } else { - if (outBuffer.available() < len) { // not enough room in current buffer, send it. - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); // drops lock - QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); - continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted. - } - encodeHeader(outBuffer, 'g', sequence); - outBuffer.putRawData(sProps); - outBuffer.putRawData(sStats); + ManagementObject::shared_ptr object = matches.front(); + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + if (!object->isDeleted()) { + string sProps, sStats; + object->writeProperties(sProps); + object->writeStatistics(sStats, true); + + size_t len = 8 + sProps.length() + sStats.length(); // 8 == size of header in bytes. + if (len > qmfV1BufferSize) { + QPID_LOG(error, "Object " << object->getObjectId() << " too large for output buffer - discarded!"); + } else { + if (outBuffer.available() < len) { // not enough room in current buffer, send it. + sendBuffer(outBuffer, dExchange, replyToKey); + QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); + continue; // lock dropped, need to re-find _SAME_ objid as it may have been deleted. } + encodeHeader(outBuffer, 'g', sequence); + outBuffer.putRawData(sProps); + outBuffer.putRawData(sStats); } } matches.pop_front(); } - outLen = MA_BUFFER_SIZE - outBuffer.available (); - if (outLen) { - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, dExchange, replyToKey); + if (outBuffer.getPosition() > 0) { + sendBuffer(outBuffer, dExchange, replyToKey); QPID_LOG(debug, "SEND GetResponse (v1) to=" << replyToKey << " seq=" << sequence); } - sendCommandCompleteLH(replyToKey, sequence); + sendCommandComplete(replyToKey, sequence); } -void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal) +void ManagementAgent::handleGetQuery(const string& body, const string& rte, const string& rtk, const string& cid, bool viaLocal) { - moveNewObjectsLH(); + moveNewObjects(); Variant::Map inMap; Variant::Map::const_iterator i; @@ -1950,17 +1911,17 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co */ i = inMap.find("_what"); if (i == inMap.end()) { - sendExceptionLH(rte, rtk, cid, "_what element missing in Query"); + sendException(rte, rtk, cid, "_what element missing in Query"); return; } if (i->second.getType() != qpid::types::VAR_STRING) { - sendExceptionLH(rte, rtk, cid, "_what element is not a string"); + sendException(rte, rtk, cid, "_what element is not a string"); return; } if (i->second.asString() != "OBJECT") { - sendExceptionLH(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported"); + sendException(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported"); return; } @@ -1984,11 +1945,11 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co } if (className == "memory") - qpid::sys::MemStat::loadMemInfo(memstat); + qpid::sys::MemStat::loadMemInfo(memstat.get()); if (className == "broker") { uint64_t uptime = sys::Duration(startTime, sys::now()); - static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); + boost::dynamic_pointer_cast<_qmf::Broker>(broker->GetManagementObject())->set_uptime(uptime); } /* @@ -2000,10 +1961,14 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co Variant::List list_; ObjectId objId(i->second.asMap()); - ManagementObjectMap::iterator iter = managementObjects.find(objId); - if (iter != managementObjects.end()) { - ManagementObject* object = iter->second; - + ManagementObject::shared_ptr object; + { + sys::Mutex::ScopedLock lock (objectLock); + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter != managementObjects.end()) + object = iter->second; + } + if (object) { if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); @@ -2027,7 +1992,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co string content; ListCodec::encode(list_, content); - sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); + sendBuffer(content, cid, headers, "amqp/list", rte, rtk); QPID_LOG(debug, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk); return; } @@ -2037,10 +2002,18 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co Variant::List _subList; unsigned int objCount = 0; - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); + ManagementObjectVector localManagementObjects; + { + sys::Mutex::ScopedLock objLock(objectLock); + std::transform(managementObjects.begin(), managementObjects.end(), + std::back_inserter(localManagementObjects), + boost::bind(&ManagementObjectMap::value_type::second, _1)); + } + + for (ManagementObjectVector::iterator iter = localManagementObjects.begin(); + iter != localManagementObjects.end(); iter++) { - ManagementObject* object = iter->second; + ManagementObject::shared_ptr object = *iter; if (object->getClassName() == className && (packageName.empty() || object->getPackageName() == packageName)) { @@ -2055,7 +2028,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co object->writeTimestamps(map_); object->mapEncodeValues(values, true, true); // write both stats and properties - iter->first.mapEncode(oidMap); + object->getObjectId().mapEncode(oidMap); map_["_values"] = values; map_["_object_id"] = oidMap; @@ -2080,13 +2053,13 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co string content; while (_list.size() > 1) { ListCodec::encode(_list.front().asList(), content); - sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); + sendBuffer(content, cid, headers, "amqp/list", rte, rtk); _list.pop_front(); QPID_LOG(debug, "SENT QueryResponse (partial, query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length()); } headers.erase("partial"); ListCodec::encode(_list.size() ? _list.front().asList() : Variant::List(), content); - sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); + sendBuffer(content, cid, headers, "amqp/list", rte, rtk); QPID_LOG(debug, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk << " len=" << content.length()); return; } @@ -2094,12 +2067,12 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co // Unrecognized query - Send empty message to indicate CommandComplete string content; ListCodec::encode(Variant::List(), content); - sendBufferLH(content, cid, headers, "amqp/list", rte, rtk); + sendBuffer(content, cid, headers, "amqp/list", rte, rtk); QPID_LOG(debug, "SENT QueryResponse (empty) to=" << rte << "/" << rtk); } -void ManagementAgent::handleLocateRequestLH(const string&, const string& rte, const string& rtk, const string& cid) +void ManagementAgent::handleLocateRequest(const string&, const string& rte, const string& rtk, const string& cid) { QPID_LOG(debug, "RCVD AgentLocateRequest"); @@ -2117,16 +2090,17 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& rte, co string content; MapCodec::encode(map, content); - sendBufferLH(content, cid, headers, "amqp/map", rte, rtk); + sendBuffer(content, cid, headers, "amqp/map", rte, rtk); clientWasAdded = true; QPID_LOG(debug, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk); } -bool ManagementAgent::authorizeAgentMessageLH(Message& msg) +bool ManagementAgent::authorizeAgentMessage(Message& msg) { - Buffer inBuffer (inputBuffer, MA_BUFFER_SIZE); + sys::Mutex::ScopedLock lock(userLock); + ResizableBuffer inBuffer (qmfV1BufferSize); uint32_t sequence = 0; bool methodReq = false; bool mapMsg = false; @@ -2140,7 +2114,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) // authorized or not. In this case, return true (authorized) if there is no ACL in place, // otherwise return false; // - if (msg.getContentSize() > MA_BUFFER_SIZE) + if (msg.getContentSize() > qmfV1BufferSize) return broker->getAcl() == 0; inBuffer.putRawData(msg.getContent()); @@ -2193,11 +2167,11 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) } // look up schema for object to get package and class name - + sys::Mutex::ScopedLock lock(objectLock); ManagementObjectMap::iterator iter = managementObjects.find(objId); if (iter == managementObjects.end() || iter->second->isDeleted()) { - QPID_LOG(debug, "ManagementAgent::authorizeAgentMessageLH: stale object id " << + QPID_LOG(debug, "ManagementAgent::authorizeAgentMessage: stale object id " << objId); return false; } @@ -2256,19 +2230,16 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) cid = p->getCorrelationId(); if (mapMsg) { - sendExceptionLH(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), - Manageable::STATUS_FORBIDDEN, false); + sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_FORBIDDEN), + Manageable::STATUS_FORBIDDEN, false); } else { - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer(qmfV1BufferSize); encodeHeader(outBuffer, 'm', sequence); outBuffer.putLong(Manageable::STATUS_FORBIDDEN); outBuffer.putMediumString(Manageable::StatusText(Manageable::STATUS_FORBIDDEN)); - outLen = MA_BUFFER_SIZE - outBuffer.available(); - outBuffer.reset(); - sendBufferLH(outBuffer, outLen, rte, rtk); + sendBuffer(outBuffer, rte, rtk); } QPID_LOG(debug, "SEND MethodResponse status=FORBIDDEN" << " seq=" << sequence); @@ -2280,7 +2251,7 @@ bool ManagementAgent::authorizeAgentMessageLH(Message& msg) return true; } -void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal) +void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal) { string rte; string rtk; @@ -2295,10 +2266,10 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal) else return; - Buffer inBuffer(inputBuffer, MA_BUFFER_SIZE); + ResizableBuffer inBuffer(qmfV1BufferSize); uint8_t opcode; - if (msg.getContentSize() > MA_BUFFER_SIZE) { + if (msg.getContentSize() > qmfV1BufferSize) { QPID_LOG(debug, "ManagementAgent::dispatchAgentCommandLH: Message too large: " << msg.getContentSize()); return; @@ -2317,39 +2288,38 @@ void ManagementAgent::dispatchAgentCommandLH(Message& msg, bool viaLocal) string body; string cid; inBuffer.getRawData(body, bufferLen); + { + if (p && p->hasCorrelationId()) { + cid = p->getCorrelationId(); + } - if (p && p->hasCorrelationId()) { - cid = p->getCorrelationId(); + if (opcode == "_method_request") + return handleMethodRequest(body, rte, rtk, cid, msg.getPublisher(), viaLocal); + else if (opcode == "_query_request") + return handleGetQuery(body, rte, rtk, cid, viaLocal); + else if (opcode == "_agent_locate_request") + return handleLocateRequest(body, rte, rtk, cid); } - - if (opcode == "_method_request") - return handleMethodRequestLH(body, rte, rtk, cid, msg.getPublisher(), viaLocal); - else if (opcode == "_query_request") - return handleGetQueryLH(body, rte, rtk, cid, viaLocal); - else if (opcode == "_agent_locate_request") - return handleLocateRequestLH(body, rte, rtk, cid); - QPID_LOG(warning, "Support for QMF Opcode [" << opcode << "] TBD!!!"); return; } // old preV2 binary messages - while (inBuffer.getPosition() < bufferLen) { uint32_t sequence; if (!checkHeader(inBuffer, &opcode, &sequence)) return; - if (opcode == 'B') handleBrokerRequestLH (inBuffer, rtk, sequence); - else if (opcode == 'P') handlePackageQueryLH (inBuffer, rtk, sequence); - else if (opcode == 'p') handlePackageIndLH (inBuffer, rtk, sequence); - else if (opcode == 'Q') handleClassQueryLH (inBuffer, rtk, sequence); - else if (opcode == 'q') handleClassIndLH (inBuffer, rtk, sequence); - else if (opcode == 'S') handleSchemaRequestLH (inBuffer, rte, rtk, sequence); - else if (opcode == 's') handleSchemaResponseLH (inBuffer, rtk, sequence); - else if (opcode == 'A') handleAttachRequestLH (inBuffer, rtk, sequence, msg.getPublisher()); - else if (opcode == 'G') handleGetQueryLH (inBuffer, rtk, sequence); - else if (opcode == 'M') handleMethodRequestLH (inBuffer, rtk, sequence, msg.getPublisher()); + if (opcode == 'B') handleBrokerRequest (inBuffer, rtk, sequence); + else if (opcode == 'P') handlePackageQuery (inBuffer, rtk, sequence); + else if (opcode == 'p') handlePackageInd (inBuffer, rtk, sequence); + else if (opcode == 'Q') handleClassQuery (inBuffer, rtk, sequence); + else if (opcode == 'q') handleClassInd (inBuffer, rtk, sequence); + else if (opcode == 'S') handleSchemaRequest (inBuffer, rte, rtk, sequence); + else if (opcode == 's') handleSchemaResponse (inBuffer, rtk, sequence); + else if (opcode == 'A') handleAttachRequest (inBuffer, rtk, sequence, msg.getPublisher()); + else if (opcode == 'G') handleGetQuery (inBuffer, rtk, sequence); + else if (opcode == 'M') handleMethodRequest (inBuffer, rtk, sequence, msg.getPublisher()); } } @@ -2365,14 +2335,11 @@ ManagementAgent::PackageMap::iterator ManagementAgent::findOrAddPackageLH(string QPID_LOG (debug, "ManagementAgent added package " << name); // Publish a package-indication message - Buffer outBuffer (outputBuffer, MA_BUFFER_SIZE); - uint32_t outLen; + ResizableBuffer outBuffer (qmfV1BufferSize); encodeHeader (outBuffer, 'p'); encodePackageIndication (outBuffer, result.first); - outLen = MA_BUFFER_SIZE - outBuffer.available (); - outBuffer.reset (); - sendBufferLH(outBuffer, outLen, mExchange, "schema.package"); + sendBuffer(outBuffer, mExchange, "schema.package"); QPID_LOG(debug, "SEND PackageInd package=" << name << " to=schema.package"); return result.first; @@ -2684,7 +2651,7 @@ void ManagementAgent::RemoteAgent::mapDecode(const Variant::Map& map_) { connectionRef.mapDecode(i->second.asMap()); } - mgmtObject = new _qmf::Agent(&agent, this); + mgmtObject = _qmf::Agent::shared_ptr(new _qmf::Agent(&agent, this)); if ((i = map_.find("_values")) != map_.end()) { mgmtObject->mapDecodeValues(i->second.asMap()); @@ -2827,8 +2794,8 @@ void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList) sys::Mutex::ScopedLock lock (userLock); - moveNewObjectsLH(); - moveDeletedObjectsLH(); + moveNewObjects(); + moveDeletedObjects(); // now copy the pending deletes into the outList for (PendingDeletedObjsMap::iterator mIter = pendingDeletedObjs.begin(); @@ -2845,15 +2812,15 @@ void ManagementAgent::exportDeletedObjects(DeletedObjectList& outList) void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList) { sys::Mutex::ScopedLock lock (userLock); + sys::Mutex::ScopedLock objLock(objectLock); // Clear out any existing deleted objects - moveNewObjectsLH(); + moveNewObjects(); pendingDeletedObjs.clear(); ManagementObjectMap::iterator i = managementObjects.begin(); // Silently drop any deleted objects left over from receiving the update. while (i != managementObjects.end()) { - ManagementObject* object = i->second; + ManagementObject::shared_ptr object = i->second; if (object->isDeleted()) { - delete object; managementObjects.erase(i++); } else ++i; @@ -2867,7 +2834,7 @@ void ManagementAgent::importDeletedObjects(const DeletedObjectList& inList) // construct a DeletedObject from a management object. -ManagementAgent::DeletedObject::DeletedObject(ManagementObject *src, bool v1, bool v2) +ManagementAgent::DeletedObject::DeletedObject(ManagementObject::shared_ptr src, bool v1, bool v2) : packageName(src->getPackageName()), className(src->getClassName()) { @@ -2943,14 +2910,17 @@ void ManagementAgent::DeletedObject::encode(std::string& toBuffer) } // Remove Deleted objects, and save for later publishing... -bool ManagementAgent::moveDeletedObjectsLH() { - typedef vector<pair<ObjectId, ManagementObject*> > DeleteList; +bool ManagementAgent::moveDeletedObjects() { + typedef vector<pair<ObjectId, ManagementObject::shared_ptr> > DeleteList; + + sys::Mutex::ScopedLock lock (objectLock); + DeleteList deleteList; for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); ++iter) { - ManagementObject* object = iter->second; + ManagementObject::shared_ptr object = iter->second; if (object->isDeleted()) deleteList.push_back(*iter); } @@ -2959,13 +2929,12 @@ bool ManagementAgent::moveDeletedObjectsLH() { iter != deleteList.rend(); iter++) { - ManagementObject* delObj = iter->second; + ManagementObject::shared_ptr delObj = iter->second; assert(delObj->isDeleted()); DeletedObject::shared_ptr dptr(new DeletedObject(delObj, qmf1Support, qmf2Support)); pendingDeletedObjs[dptr->getKey()].push_back(dptr); managementObjects.erase(iter->first); - delete iter->second; } return !deleteList.empty(); } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index c7e830dcf5..fba733a984 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -37,6 +37,7 @@ #include "qpid/types/Variant.h" #include <qpid/framing/AMQFrame.h> #include <qpid/framing/ResizableBuffer.h> +#include <boost/shared_ptr.hpp> #include <memory> #include <string> #include <map> @@ -100,12 +101,12 @@ public: const std::string& eventName, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall); - QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, - uint64_t persistId = 0, - bool persistent = false); - QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, - const std::string& key, - bool persistent = false); + QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object, + uint64_t persistId = 0, + bool persistent = false); + QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object, + const std::string& key, + bool persistent = false); QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event, severity_t severity = SEV_DEFAULT); QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey); @@ -158,7 +159,7 @@ public: class DeletedObject { public: typedef boost::shared_ptr<DeletedObject> shared_ptr; - DeletedObject(ManagementObject *, bool v1, bool v2); + DeletedObject(ManagementObject::shared_ptr, bool v1, bool v2); DeletedObject( const std::string &encoded ); ~DeletedObject() {}; void encode( std::string& toBuffer ); @@ -207,9 +208,9 @@ private: uint32_t agentBank; std::string routingKey; ObjectId connectionRef; - qmf::org::apache::qpid::broker::Agent* mgmtObject; - RemoteAgent(ManagementAgent& _agent) : agent(_agent), mgmtObject(0) {} - ManagementObject* GetManagementObject (void) const { return mgmtObject; } + qmf::org::apache::qpid::broker::Agent::shared_ptr mgmtObject; + RemoteAgent(ManagementAgent& _agent) : agent(_agent) {} + ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } virtual ~RemoteAgent (); void mapEncode(qpid::types::Variant::Map& _map) const; @@ -276,7 +277,7 @@ private: PackageMap packages; // - // Protected by userLock + // Protected by objectLock // ManagementObjectMap managementObjects; @@ -288,11 +289,11 @@ private: framing::Uuid uuid; // - // Lock hierarchy: If a thread needs to take both addLock and userLock, - // it MUST take userLock first, then addLock. + // Lock ordering: userLock -> addLock -> objectLock // sys::Mutex userLock; sys::Mutex addLock; + sys::Mutex objectLock; qpid::broker::Exchange::shared_ptr mExchange; qpid::broker::Exchange::shared_ptr dExchange; @@ -335,53 +336,45 @@ private: // list of objects that have been deleted, but have yet to be published // one final time. // Indexed by a string composed of the object's package and class name. - // Protected by userLock. + // Protected by objectLock. typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap; PendingDeletedObjsMap pendingDeletedObjs; -# define MA_BUFFER_SIZE 65536 - char inputBuffer[MA_BUFFER_SIZE]; - char outputBuffer[MA_BUFFER_SIZE]; - char eventBuffer[MA_BUFFER_SIZE]; - framing::ResizableBuffer msgBuffer; - // // Memory statistics object // - qmf::org::apache::qpid::broker::Memory *memstat; + qmf::org::apache::qpid::broker::Memory::shared_ptr memstat; void writeData (); void periodicProcessing (void); - void deleteObjectNowLH(const ObjectId& oid); + void deleteObjectNow(const ObjectId& oid); void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); - void sendBufferLH(framing::Buffer& buf, - uint32_t length, - qpid::broker::Exchange::shared_ptr exchange, - const std::string& routingKey); - void sendBufferLH(framing::Buffer& buf, - uint32_t length, - const std::string& exchange, - const std::string& routingKey); - void sendBufferLH(const std::string& data, - const std::string& cid, - const qpid::types::Variant::Map& headers, - const std::string& content_type, - qpid::broker::Exchange::shared_ptr exchange, - const std::string& routingKey, - uint64_t ttl_msec = 0); - void sendBufferLH(const std::string& data, - const std::string& cid, - const qpid::types::Variant::Map& headers, - const std::string& content_type, - const std::string& exchange, - const std::string& routingKey, - uint64_t ttl_msec = 0); - void moveNewObjectsLH(); - bool moveDeletedObjectsLH(); - - bool authorizeAgentMessageLH(qpid::broker::Message& msg); - void dispatchAgentCommandLH(qpid::broker::Message& msg, bool viaLocal=false); + void sendBuffer(framing::Buffer& buf, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey); + void sendBuffer(framing::Buffer& buf, + const std::string& exchange, + const std::string& routingKey); + void sendBuffer(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map& headers, + const std::string& content_type, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey, + uint64_t ttl_msec = 0); + void sendBuffer(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map& headers, + const std::string& content_type, + const std::string& exchange, + const std::string& routingKey, + uint64_t ttl_msec = 0); + void moveNewObjects(); + bool moveDeletedObjects(); + + bool authorizeAgentMessage(qpid::broker::Message& msg); + void dispatchAgentCommand(qpid::broker::Message& msg, bool viaLocal=false); PackageMap::iterator findOrAddPackageLH(std::string name); void addClassLH(uint8_t kind, @@ -399,22 +392,22 @@ private: uint32_t allocateNewBank (); uint32_t assignBankLH (uint32_t requestedPrefix); void deleteOrphanedAgentsLH(); - void sendCommandCompleteLH(const std::string& replyToKey, uint32_t sequence, - uint32_t code = 0, const std::string& text = "OK"); - void sendExceptionLH(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false); - void handleBrokerRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handlePackageQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handlePackageIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleClassQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleClassIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleSchemaRequestLH (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence); - void handleSchemaResponseLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleAttachRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); - void handleGetQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleMethodRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); - void handleGetQueryLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal); - void handleMethodRequestLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal); - void handleLocateRequestLH (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid); + void sendCommandComplete(const std::string& replyToKey, uint32_t sequence, + uint32_t code = 0, const std::string& text = "OK"); + void sendException(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false); + void handleBrokerRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handlePackageQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handlePackageInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleClassQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleClassInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleSchemaRequest (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence); + void handleSchemaResponse (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleAttachRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); + void handleGetQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleMethodRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); + void handleGetQuery (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal); + void handleMethodRequest (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal); + void handleLocateRequest (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid); size_t validateSchema(framing::Buffer&, uint8_t kind); diff --git a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp index 02aa87f876..9c21e51a18 100644 --- a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp +++ b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp @@ -41,751 +41,700 @@ using namespace qpid::types; namespace qpid { - namespace tests { - - namespace _qmf = qmf::org::apache::qpid::broker::mgmt::test; - namespace { - - typedef boost::shared_ptr<_qmf::TestObject> TestObjectPtr; - typedef std::vector<TestObjectPtr> TestObjectVector; - - // Instantiates a broker and its internal management agent. Provides - // factories for constructing Receivers for object indication messages. - // - class AgentFixture - { - MessagingFixture *mFix; - - public: - AgentFixture( unsigned int pubInterval=10, - bool qmfV2=false, - qpid::broker::Broker::Options opts = qpid::broker::Broker::Options()) - { - opts.enableMgmt=true; - opts.qmf2Support=qmfV2; - opts.mgmtPubInterval=pubInterval; - mFix = new MessagingFixture(opts, true); - - _qmf::TestObject::registerSelf(getBrokerAgent()); - }; - ~AgentFixture() - { - delete mFix; - }; - ::qpid::management::ManagementAgent *getBrokerAgent() { return mFix->broker->getManagementAgent(); } - Receiver createV1DataIndRcvr( const std::string package, const std::string klass ) - { - return mFix->session.createReceiver(std::string("kqueue; {create: always, delete: always, " - "node: {type: queue, " - "x-bindings: [{exchange: qpid.management, " - "key: 'console.obj.1.0.") - + package + std::string(".") + klass - + std::string("'}]}}")); - }; - Receiver createV2DataIndRcvr( const std::string package, const std::string klass ) - { - std::string p(package); - std::replace(p.begin(), p.end(), '.', '_'); - std::string k(klass); - std::replace(k.begin(), k.end(), '.', '_'); - - return mFix->session.createReceiver(std::string("kqueue; {create: always, delete: always, " - "node: {type: queue, " - "x-bindings: [{exchange: qmf.default.topic, " - "key: 'agent.ind.data.") - + p + std::string(".") + k - + std::string("'}]}}")); - }; - }; - - - // A "management object" that supports the TestObject - // - class TestManageable : public qpid::management::Manageable - { - management::ManagementObject* mgmtObj; - const std::string key; - public: - TestManageable(management::ManagementAgent *agent, std::string _key) - : key(_key) - { - _qmf::TestObject *tmp = new _qmf::TestObject(agent, this); - - // seed it with some default values... - tmp->set_string1(key); - tmp->set_bool1(true); - qpid::types::Variant::Map vMap; - vMap["one"] = qpid::types::Variant(1); - vMap["two"] = qpid::types::Variant("two"); - vMap["three"] = qpid::types::Variant("whatever"); - tmp->set_map1(vMap); - - mgmtObj = tmp; - }; - ~TestManageable() { mgmtObj = 0; /* deleted by agent on shutdown */ }; - management::ManagementObject* GetManagementObject() const { return mgmtObj; }; - static void validateTestObjectProperties(_qmf::TestObject& to) - { - // verify the default values are as expected. We don't check 'string1', - // as it is the object key, and is unique for each object (no default value). - BOOST_CHECK(to.get_bool1() == true); - BOOST_CHECK(to.get_map1().size() == 3); - qpid::types::Variant::Map mappy = to.get_map1(); - BOOST_CHECK(1 == (unsigned int)mappy["one"]); - BOOST_CHECK(mappy["two"].asString() == std::string("two")); - BOOST_CHECK(mappy["three"].asString() == std::string("whatever")); - }; - }; - - - // decode a V1 Content Indication message - // - void decodeV1ObjectUpdates(const Message& inMsg, TestObjectVector& objs, const size_t objLen) - { - const size_t MAX_BUFFER_SIZE=65536; - char tmp[MAX_BUFFER_SIZE]; - - objs.clear(); - - BOOST_CHECK(inMsg.getContent().size() <= MAX_BUFFER_SIZE); - - ::memcpy(tmp, inMsg.getContent().data(), inMsg.getContent().size()); - Buffer buf(tmp, inMsg.getContent().size()); - - while (buf.available() > 8) { // 8 == qmf v1 header size - BOOST_CHECK_EQUAL(buf.getOctet(), 'A'); - BOOST_CHECK_EQUAL(buf.getOctet(), 'M'); - BOOST_CHECK_EQUAL(buf.getOctet(), '2'); - BOOST_CHECK_EQUAL(buf.getOctet(), 'c'); // opcode == content indication - // @@todo: kag: how do we skip 'i' entries??? - buf.getLong(); // ignore sequence - - std::string str1; // decode content body as string - buf.getRawData(str1, objLen); - - TestObjectPtr fake(new _qmf::TestObject(0,0)); - fake->readProperties( str1 ); - objs.push_back(fake); - } - } - - - // decode a V2 Content Indication message - // - void decodeV2ObjectUpdates(const qpid::messaging::Message& inMsg, TestObjectVector& objs) - { - objs.clear(); +namespace tests { + +namespace _qmf = qmf::org::apache::qpid::broker::mgmt::test; +namespace { + +typedef boost::shared_ptr<_qmf::TestObject> TestObjectPtr; +typedef std::vector<TestObjectPtr> TestObjectVector; + +// Instantiates a broker and its internal management agent. Provides +// factories for constructing Receivers for object indication messages. +// +class AgentFixture +{ + MessagingFixture *mFix; + + public: + AgentFixture( unsigned int pubInterval=10, + bool qmfV2=false, + qpid::broker::Broker::Options opts = qpid::broker::Broker::Options()) + { + opts.enableMgmt=true; + opts.qmf2Support=qmfV2; + opts.mgmtPubInterval=pubInterval; + mFix = new MessagingFixture(opts, true); + + _qmf::TestObject::registerSelf(getBrokerAgent()); + }; + ~AgentFixture() + { + delete mFix; + }; + ::qpid::management::ManagementAgent *getBrokerAgent() { return mFix->broker->getManagementAgent(); } + Receiver createV1DataIndRcvr( const std::string package, const std::string klass ) + { + return mFix->session.createReceiver(std::string("kqueue; {create: always, delete: always, " + "node: {type: queue, " + "x-bindings: [{exchange: qpid.management, " + "key: 'console.obj.1.0.") + + package + std::string(".") + klass + + std::string("'}]}}")); + }; + Receiver createV2DataIndRcvr( const std::string package, const std::string klass ) + { + std::string p(package); + std::replace(p.begin(), p.end(), '.', '_'); + std::string k(klass); + std::replace(k.begin(), k.end(), '.', '_'); + + return mFix->session.createReceiver(std::string("kqueue; {create: always, delete: always, " + "node: {type: queue, " + "x-bindings: [{exchange: qmf.default.topic, " + "key: 'agent.ind.data.") + + p + std::string(".") + k + + std::string("'}]}}")); + }; +}; + + +// A "management object" that supports the TestObject +// +class TestManageable : public qpid::management::Manageable +{ + management::ManagementObject::shared_ptr mgmtObj; + const std::string key; + public: + TestManageable(management::ManagementAgent *agent, std::string _key) + : key(_key) + { + _qmf::TestObject::shared_ptr tmp(new _qmf::TestObject(agent, this)); + + // seed it with some default values... + tmp->set_string1(key); + tmp->set_bool1(true); + qpid::types::Variant::Map vMap; + vMap["one"] = qpid::types::Variant(1); + vMap["two"] = qpid::types::Variant("two"); + vMap["three"] = qpid::types::Variant("whatever"); + tmp->set_map1(vMap); + + mgmtObj = tmp; + }; + ~TestManageable() { mgmtObj.reset(); } + management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObj; }; + static void validateTestObjectProperties(_qmf::TestObject& to) + { + // verify the default values are as expected. We don't check 'string1', + // as it is the object key, and is unique for each object (no default value). + BOOST_CHECK(to.get_bool1() == true); + BOOST_CHECK(to.get_map1().size() == 3); + qpid::types::Variant::Map mappy = to.get_map1(); + BOOST_CHECK(1 == (unsigned int)mappy["one"]); + BOOST_CHECK(mappy["two"].asString() == std::string("two")); + BOOST_CHECK(mappy["three"].asString() == std::string("whatever")); + }; +}; + + +// decode a V1 Content Indication message +// +void decodeV1ObjectUpdates(const Message& inMsg, TestObjectVector& objs, const size_t objLen) +{ + const size_t MAX_BUFFER_SIZE=65536; + char tmp[MAX_BUFFER_SIZE]; + + objs.clear(); + + BOOST_CHECK(inMsg.getContent().size() <= MAX_BUFFER_SIZE); + + ::memcpy(tmp, inMsg.getContent().data(), inMsg.getContent().size()); + Buffer buf(tmp, inMsg.getContent().size()); + + while (buf.available() > 8) { // 8 == qmf v1 header size + BOOST_CHECK_EQUAL(buf.getOctet(), 'A'); + BOOST_CHECK_EQUAL(buf.getOctet(), 'M'); + BOOST_CHECK_EQUAL(buf.getOctet(), '2'); + BOOST_CHECK_EQUAL(buf.getOctet(), 'c'); // opcode == content indication + // @@todo: kag: how do we skip 'i' entries??? + buf.getLong(); // ignore sequence + + std::string str1; // decode content body as string + buf.getRawData(str1, objLen); + + TestObjectPtr fake(new _qmf::TestObject(0,0)); + fake->readProperties( str1 ); + objs.push_back(fake); + } +} - BOOST_CHECK_EQUAL(inMsg.getContentType(), std::string("amqp/list")); - const ::qpid::types::Variant::Map& m = inMsg.getProperties(); - Variant::Map::const_iterator iter = m.find(std::string("qmf.opcode")); - BOOST_CHECK(iter != m.end()); - BOOST_CHECK_EQUAL(iter->second.asString(), std::string("_data_indication")); +// decode a V2 Content Indication message +// +void decodeV2ObjectUpdates(const qpid::messaging::Message& inMsg, TestObjectVector& objs) +{ + objs.clear(); - Variant::List vList; - ::qpid::amqp_0_10::ListCodec::decode(inMsg.getContent(), vList); + BOOST_CHECK_EQUAL(inMsg.getContentType(), std::string("amqp/list")); - for (Variant::List::iterator lIter = vList.begin(); lIter != vList.end(); lIter++) { - TestObjectPtr fake(new _qmf::TestObject(0,0)); - fake->readTimestamps(lIter->asMap()); - fake->mapDecodeValues((lIter->asMap())["_values"].asMap()); - objs.push_back(fake); - } - } - } + const ::qpid::types::Variant::Map& m = inMsg.getProperties(); + Variant::Map::const_iterator iter = m.find(std::string("qmf.opcode")); + BOOST_CHECK(iter != m.end()); + BOOST_CHECK_EQUAL(iter->second.asString(), std::string("_data_indication")); - QPID_AUTO_TEST_SUITE(BrokerMgmtAgent) + Variant::List vList; + ::qpid::amqp_0_10::ListCodec::decode(inMsg.getContent(), vList); - // verify that an object that is added to the broker's management database is - // published correctly. Furthermore, verify that it is published once after - // it has been deleted. - // - QPID_AUTO_TEST_CASE(v1ObjPublish) - { - AgentFixture* fix = new AgentFixture(3); - management::ManagementAgent* agent; - agent = fix->getBrokerAgent(); + for (Variant::List::iterator lIter = vList.begin(); lIter != vList.end(); lIter++) { + TestObjectPtr fake(new _qmf::TestObject(0,0)); + fake->readTimestamps(lIter->asMap()); + fake->mapDecodeValues((lIter->asMap())["_values"].asMap()); + objs.push_back(fake); + } +} +} - // create a manageable test object - TestManageable *tm = new TestManageable(agent, std::string("obj1")); - uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); +QPID_AUTO_TEST_SUITE(BrokerMgmtAgent) - Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); +// verify that an object that is added to the broker's management database is +// published correctly. Furthermore, verify that it is published once after +// it has been deleted. +// +QPID_AUTO_TEST_CASE(v1ObjPublish) +{ + AgentFixture* fix = new AgentFixture(3); + management::ManagementAgent* agent; + agent = fix->getBrokerAgent(); - agent->addObject(tm->GetManagementObject(), 1); + // create a manageable test object + TestManageable *tm = new TestManageable(agent, std::string("obj1")); + uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); - // wait for the object to be published - Message m1; - BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6)); + Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - TestObjectVector objs; - decodeV1ObjectUpdates(m1, objs, objLen); - BOOST_CHECK(objs.size() > 0); + agent->addObject(tm->GetManagementObject(), 1); - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + // wait for the object to be published + Message m1; + BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6)); - TestManageable::validateTestObjectProperties(**oIter); + TestObjectVector objs; + decodeV1ObjectUpdates(m1, objs, objLen); + BOOST_CHECK(objs.size() > 0); - qpid::types::Variant::Map mappy; - (*oIter)->writeTimestamps(mappy); - BOOST_CHECK(0 == mappy["_delete_ts"].asUint64()); // not deleted - } + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - // destroy the object + TestManageable::validateTestObjectProperties(**oIter); - tm->GetManagementObject()->resourceDestroy(); + qpid::types::Variant::Map mappy; + (*oIter)->writeTimestamps(mappy); + BOOST_CHECK(0 == mappy["_delete_ts"].asUint64()); // not deleted + } - // wait for the deleted object to be published + // destroy the object - bool isDeleted = false; - while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) { + tm->GetManagementObject()->resourceDestroy(); - decodeV1ObjectUpdates(m1, objs, objLen); - BOOST_CHECK(objs.size() > 0); + // wait for the deleted object to be published - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + bool isDeleted = false; + while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) { - TestManageable::validateTestObjectProperties(**oIter); + decodeV1ObjectUpdates(m1, objs, objLen); + BOOST_CHECK(objs.size() > 0); - qpid::types::Variant::Map mappy; - (*oIter)->writeTimestamps(mappy); - if (mappy["_delete_ts"].asUint64() != 0) - isDeleted = true; - } - } + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - BOOST_CHECK(isDeleted); + TestManageable::validateTestObjectProperties(**oIter); - r1.close(); - delete fix; - delete tm; + qpid::types::Variant::Map mappy; + (*oIter)->writeTimestamps(mappy); + if (mappy["_delete_ts"].asUint64() != 0) + isDeleted = true; } + } - // Repeat the previous test, but with V2-based object support - // - QPID_AUTO_TEST_CASE(v2ObjPublish) - { - AgentFixture* fix = new AgentFixture(3, true); - management::ManagementAgent* agent; - agent = fix->getBrokerAgent(); + BOOST_CHECK(isDeleted); - TestManageable *tm = new TestManageable(agent, std::string("obj2")); + r1.close(); + delete fix; + delete tm; +} - Receiver r1 = fix->createV2DataIndRcvr(tm->GetManagementObject()->getPackageName(), "#"); +// Repeat the previous test, but with V2-based object support +// +QPID_AUTO_TEST_CASE(v2ObjPublish) +{ + AgentFixture* fix = new AgentFixture(3, true); + management::ManagementAgent* agent; + agent = fix->getBrokerAgent(); - agent->addObject(tm->GetManagementObject(), "testobj-1"); + TestManageable *tm = new TestManageable(agent, std::string("obj2")); - // wait for the object to be published - Message m1; - BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6)); + Receiver r1 = fix->createV2DataIndRcvr(tm->GetManagementObject()->getPackageName(), "#"); - TestObjectVector objs; - decodeV2ObjectUpdates(m1, objs); - BOOST_CHECK(objs.size() > 0); + agent->addObject(tm->GetManagementObject(), "testobj-1"); - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + // wait for the object to be published + Message m1; + BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6)); - TestManageable::validateTestObjectProperties(**oIter); + TestObjectVector objs; + decodeV2ObjectUpdates(m1, objs); + BOOST_CHECK(objs.size() > 0); - qpid::types::Variant::Map mappy; - (*oIter)->writeTimestamps(mappy); - BOOST_CHECK(0 == mappy["_delete_ts"].asUint64()); - } + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - // destroy the object + TestManageable::validateTestObjectProperties(**oIter); - tm->GetManagementObject()->resourceDestroy(); + qpid::types::Variant::Map mappy; + (*oIter)->writeTimestamps(mappy); + BOOST_CHECK(0 == mappy["_delete_ts"].asUint64()); + } - // wait for the deleted object to be published + // destroy the object - bool isDeleted = false; - while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) { + tm->GetManagementObject()->resourceDestroy(); - decodeV2ObjectUpdates(m1, objs); - BOOST_CHECK(objs.size() > 0); + // wait for the deleted object to be published - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + bool isDeleted = false; + while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) { - TestManageable::validateTestObjectProperties(**oIter); + decodeV2ObjectUpdates(m1, objs); + BOOST_CHECK(objs.size() > 0); - qpid::types::Variant::Map mappy; - (*oIter)->writeTimestamps(mappy); - if (mappy["_delete_ts"].asUint64() != 0) - isDeleted = true; - } - } + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - BOOST_CHECK(isDeleted); + TestManageable::validateTestObjectProperties(**oIter); - r1.close(); - delete fix; - delete tm; + qpid::types::Variant::Map mappy; + (*oIter)->writeTimestamps(mappy); + if (mappy["_delete_ts"].asUint64() != 0) + isDeleted = true; } + } + BOOST_CHECK(isDeleted); - // verify that a deleted object is exported correctly using the - // exportDeletedObjects() method. V1 testcase. - // - QPID_AUTO_TEST_CASE(v1ExportDelObj) - { - AgentFixture* fix = new AgentFixture(3); - management::ManagementAgent* agent; - agent = fix->getBrokerAgent(); - - // create a manageable test object - TestManageable *tm = new TestManageable(agent, std::string("myObj")); - uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); - - Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); + r1.close(); + delete fix; + delete tm; +} - agent->addObject(tm->GetManagementObject(), 1); - // wait for the object to be published - Message m1; - BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6)); +// verify that a deleted object is exported correctly using the +// exportDeletedObjects() method. V1 testcase. +// +QPID_AUTO_TEST_CASE(v1ExportDelObj) +{ + AgentFixture* fix = new AgentFixture(3); + management::ManagementAgent* agent; + agent = fix->getBrokerAgent(); - TestObjectVector objs; - decodeV1ObjectUpdates(m1, objs, objLen); - BOOST_CHECK(objs.size() > 0); + // create a manageable test object + TestManageable *tm = new TestManageable(agent, std::string("myObj")); + uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); - // destroy the object, then immediately export (before the next poll cycle) + Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - ::qpid::management::ManagementAgent::DeletedObjectList delObjs; - tm->GetManagementObject()->resourceDestroy(); - agent->exportDeletedObjects( delObjs ); - BOOST_CHECK(delObjs.size() == 1); + agent->addObject(tm->GetManagementObject(), 1); - // wait for the deleted object to be published + // wait for the object to be published + Message m1; + BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6)); - bool isDeleted = false; - while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) { + TestObjectVector objs; + decodeV1ObjectUpdates(m1, objs, objLen); + BOOST_CHECK(objs.size() > 0); - decodeV1ObjectUpdates(m1, objs, objLen); - BOOST_CHECK(objs.size() > 0); + // destroy the object, then immediately export (before the next poll cycle) - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + ::qpid::management::ManagementAgent::DeletedObjectList delObjs; + tm->GetManagementObject()->resourceDestroy(); + agent->exportDeletedObjects( delObjs ); + BOOST_CHECK(delObjs.size() == 1); - TestManageable::validateTestObjectProperties(**oIter); + // wait for the deleted object to be published - qpid::types::Variant::Map mappy; - (*oIter)->writeTimestamps(mappy); - if (mappy["_delete_ts"].asUint64() != 0) - isDeleted = true; - } - } + bool isDeleted = false; + while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) { - BOOST_CHECK(isDeleted); + decodeV1ObjectUpdates(m1, objs, objLen); + BOOST_CHECK(objs.size() > 0); - // verify there are no deleted objects to export now. + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - agent->exportDeletedObjects( delObjs ); - BOOST_CHECK(delObjs.size() == 0); + TestManageable::validateTestObjectProperties(**oIter); - r1.close(); - delete fix; - delete tm; + qpid::types::Variant::Map mappy; + (*oIter)->writeTimestamps(mappy); + if (mappy["_delete_ts"].asUint64() != 0) + isDeleted = true; } + } + BOOST_CHECK(isDeleted); - // verify that a deleted object is imported correctly using the - // importDeletedObjects() method. V1 testcase. - // - QPID_AUTO_TEST_CASE(v1ImportDelObj) - { - AgentFixture* fix = new AgentFixture(3); - management::ManagementAgent* agent; - agent = fix->getBrokerAgent(); + // verify there are no deleted objects to export now. - // create a manageable test object - TestManageable *tm = new TestManageable(agent, std::string("anObj")); - uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); + agent->exportDeletedObjects( delObjs ); + BOOST_CHECK(delObjs.size() == 0); - Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); + r1.close(); + delete fix; + delete tm; +} - agent->addObject(tm->GetManagementObject(), 1); - // wait for the object to be published - Message m1; - BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6)); +// verify that a deleted object is imported correctly using the +// importDeletedObjects() method. V1 testcase. +// +QPID_AUTO_TEST_CASE(v1ImportDelObj) +{ + AgentFixture* fix = new AgentFixture(3); + management::ManagementAgent* agent; + agent = fix->getBrokerAgent(); - TestObjectVector objs; - decodeV1ObjectUpdates(m1, objs, objLen); - BOOST_CHECK(objs.size() > 0); + // create a manageable test object + TestManageable *tm = new TestManageable(agent, std::string("anObj")); + uint32_t objLen = tm->GetManagementObject()->writePropertiesSize(); - // destroy the object, then immediately export (before the next poll cycle) + Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - ::qpid::management::ManagementAgent::DeletedObjectList delObjs; - tm->GetManagementObject()->resourceDestroy(); - agent->exportDeletedObjects( delObjs ); - BOOST_CHECK(delObjs.size() == 1); + agent->addObject(tm->GetManagementObject(), 1); - // destroy the broker, and reinistantiate a new one without populating it - // with a TestObject. + // wait for the object to be published + Message m1; + BOOST_CHECK(r1.fetch(m1, Duration::SECOND * 6)); - r1.close(); - delete fix; - delete tm; // should no longer be necessary + TestObjectVector objs; + decodeV1ObjectUpdates(m1, objs, objLen); + BOOST_CHECK(objs.size() > 0); - fix = new AgentFixture(3); - r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - agent = fix->getBrokerAgent(); - agent->importDeletedObjects( delObjs ); + // destroy the object, then immediately export (before the next poll cycle) - // wait for the deleted object to be published + ::qpid::management::ManagementAgent::DeletedObjectList delObjs; + tm->GetManagementObject()->resourceDestroy(); + agent->exportDeletedObjects( delObjs ); + BOOST_CHECK(delObjs.size() == 1); - bool isDeleted = false; - while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) { + // destroy the broker, and reinistantiate a new one without populating it + // with a TestObject. - decodeV1ObjectUpdates(m1, objs, objLen); - BOOST_CHECK(objs.size() > 0); + r1.close(); + delete fix; + delete tm; // should no longer be necessary - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + fix = new AgentFixture(3); + r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); + agent = fix->getBrokerAgent(); + agent->importDeletedObjects( delObjs ); - TestManageable::validateTestObjectProperties(**oIter); + // wait for the deleted object to be published - qpid::types::Variant::Map mappy; - (*oIter)->writeTimestamps(mappy); - if (mappy["_delete_ts"].asUint64() != 0) - isDeleted = true; - } - } + bool isDeleted = false; + while (!isDeleted && r1.fetch(m1, Duration::SECOND * 6)) { - BOOST_CHECK(isDeleted); + decodeV1ObjectUpdates(m1, objs, objLen); + BOOST_CHECK(objs.size() > 0); - // verify there are no deleted objects to export now. + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - agent->exportDeletedObjects( delObjs ); - BOOST_CHECK(delObjs.size() == 0); + TestManageable::validateTestObjectProperties(**oIter); - r1.close(); - delete fix; + qpid::types::Variant::Map mappy; + (*oIter)->writeTimestamps(mappy); + if (mappy["_delete_ts"].asUint64() != 0) + isDeleted = true; } + } + BOOST_CHECK(isDeleted); - // verify that an object that is added and deleted prior to the - // first poll cycle is accounted for by the export - // - QPID_AUTO_TEST_CASE(v1ExportFastDelObj) - { - AgentFixture* fix = new AgentFixture(3); - management::ManagementAgent* agent; - agent = fix->getBrokerAgent(); + // verify there are no deleted objects to export now. - // create a manageable test object - TestManageable *tm = new TestManageable(agent, std::string("objectifyMe")); + agent->exportDeletedObjects( delObjs ); + BOOST_CHECK(delObjs.size() == 0); - // add, then immediately delete and export the object... + r1.close(); + delete fix; +} - ::qpid::management::ManagementAgent::DeletedObjectList delObjs; - agent->addObject(tm->GetManagementObject(), 999); - tm->GetManagementObject()->resourceDestroy(); - agent->exportDeletedObjects( delObjs ); - BOOST_CHECK(delObjs.size() == 1); - delete fix; - delete tm; - } +// verify that an object that is added and deleted prior to the +// first poll cycle is accounted for by the export +// +QPID_AUTO_TEST_CASE(v1ExportFastDelObj) +{ + AgentFixture* fix = new AgentFixture(3); + management::ManagementAgent* agent; + agent = fix->getBrokerAgent(); + // create a manageable test object + TestManageable *tm = new TestManageable(agent, std::string("objectifyMe")); - // Verify that we can export and import multiple deleted objects correctly. - // - QPID_AUTO_TEST_CASE(v1ImportMultiDelObj) - { - AgentFixture* fix = new AgentFixture(3); - management::ManagementAgent* agent; - agent = fix->getBrokerAgent(); - - Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - - // populate the agent with multiple test objects - const size_t objCount = 50; - std::vector<TestManageable *> tmv; - uint32_t objLen; - - for (size_t i = 0; i < objCount; i++) { - std::stringstream key; - key << "testobj-" << std::setfill('x') << std::setw(4) << i; - // (no, seriously, I didn't just do that.) - // Note well: we have to keep the key string length EXACTLY THE SAME - // FOR ALL OBJECTS, so objLen will be the same. Otherwise the - // decodeV1ObjectUpdates() will fail (v1 lacks explict encoded length). - TestManageable *tm = new TestManageable(agent, key.str()); - objLen = tm->GetManagementObject()->writePropertiesSize(); - agent->addObject(tm->GetManagementObject(), i + 1); - tmv.push_back(tm); - } + // add, then immediately delete and export the object... - // wait for the objects to be published - Message m1; - uint32_t msgCount = 0; - while(r1.fetch(m1, Duration::SECOND * 6)) { - TestObjectVector objs; - decodeV1ObjectUpdates(m1, objs, objLen); - msgCount += objs.size(); - } + ::qpid::management::ManagementAgent::DeletedObjectList delObjs; + agent->addObject(tm->GetManagementObject(), 999); + tm->GetManagementObject()->resourceDestroy(); + agent->exportDeletedObjects( delObjs ); + BOOST_CHECK(delObjs.size() == 1); - BOOST_CHECK_EQUAL(msgCount, objCount); + delete fix; + delete tm; +} - // destroy some of the objects, then immediately export (before the next poll cycle) - uint32_t delCount = 0; - for (size_t i = 0; i < objCount; i += 2) { - tmv[i]->GetManagementObject()->resourceDestroy(); - delCount++; - } +// Verify that we can export and import multiple deleted objects correctly. +// +QPID_AUTO_TEST_CASE(v1ImportMultiDelObj) +{ + AgentFixture* fix = new AgentFixture(3); + management::ManagementAgent* agent; + agent = fix->getBrokerAgent(); + + Receiver r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); + + // populate the agent with multiple test objects + const size_t objCount = 50; + std::vector<TestManageable *> tmv; + uint32_t objLen; + + for (size_t i = 0; i < objCount; i++) { + std::stringstream key; + key << "testobj-" << std::setfill('x') << std::setw(4) << i; + // (no, seriously, I didn't just do that.) + // Note well: we have to keep the key string length EXACTLY THE SAME + // FOR ALL OBJECTS, so objLen will be the same. Otherwise the + // decodeV1ObjectUpdates() will fail (v1 lacks explict encoded length). + TestManageable *tm = new TestManageable(agent, key.str()); + objLen = tm->GetManagementObject()->writePropertiesSize(); + agent->addObject(tm->GetManagementObject(), i + 1); + tmv.push_back(tm); + } - ::qpid::management::ManagementAgent::DeletedObjectList delObjs; - agent->exportDeletedObjects( delObjs ); - BOOST_CHECK_EQUAL(delObjs.size(), delCount); + // wait for the objects to be published + Message m1; + uint32_t msgCount = 0; + while(r1.fetch(m1, Duration::SECOND * 6)) { + TestObjectVector objs; + decodeV1ObjectUpdates(m1, objs, objLen); + msgCount += objs.size(); + } - // destroy the broker, and reinistantiate a new one without populating it - // with TestObjects. + BOOST_CHECK_EQUAL(msgCount, objCount); - r1.close(); - delete fix; - while (tmv.size()) { - delete tmv.back(); - tmv.pop_back(); - } + // destroy some of the objects, then immediately export (before the next poll cycle) - fix = new AgentFixture(3); - r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - agent = fix->getBrokerAgent(); - agent->importDeletedObjects( delObjs ); + uint32_t delCount = 0; + for (size_t i = 0; i < objCount; i += 2) { + tmv[i]->GetManagementObject()->resourceDestroy(); + delCount++; + } - // wait for the deleted object to be published, verify the count + ::qpid::management::ManagementAgent::DeletedObjectList delObjs; + agent->exportDeletedObjects( delObjs ); + BOOST_CHECK_EQUAL(delObjs.size(), delCount); - uint32_t countDels = 0; - while (r1.fetch(m1, Duration::SECOND * 6)) { - TestObjectVector objs; - decodeV1ObjectUpdates(m1, objs, objLen); - BOOST_CHECK(objs.size() > 0); + // destroy the broker, and reinistantiate a new one without populating it + // with TestObjects. - - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + r1.close(); + delete fix; + while (tmv.size()) { + delete tmv.back(); + tmv.pop_back(); + } - TestManageable::validateTestObjectProperties(**oIter); + fix = new AgentFixture(3); + r1 = fix->createV1DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); + agent = fix->getBrokerAgent(); + agent->importDeletedObjects( delObjs ); - qpid::types::Variant::Map mappy; - (*oIter)->writeTimestamps(mappy); - if (mappy["_delete_ts"].asUint64() != 0) - countDels++; - } - } + // wait for the deleted object to be published, verify the count - // make sure we get the correct # of deleted objects - BOOST_CHECK_EQUAL(countDels, delCount); + uint32_t countDels = 0; + while (r1.fetch(m1, Duration::SECOND * 6)) { + TestObjectVector objs; + decodeV1ObjectUpdates(m1, objs, objLen); + BOOST_CHECK(objs.size() > 0); - // verify there are no deleted objects to export now. + + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - agent->exportDeletedObjects( delObjs ); - BOOST_CHECK(delObjs.size() == 0); + TestManageable::validateTestObjectProperties(**oIter); - r1.close(); - delete fix; + qpid::types::Variant::Map mappy; + (*oIter)->writeTimestamps(mappy); + if (mappy["_delete_ts"].asUint64() != 0) + countDels++; } + } - // Verify that we can export and import multiple deleted objects correctly. - // QMF V2 variant - QPID_AUTO_TEST_CASE(v2ImportMultiDelObj) - { - AgentFixture* fix = new AgentFixture(3, true); - management::ManagementAgent* agent; - agent = fix->getBrokerAgent(); - - Receiver r1 = fix->createV2DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - - // populate the agent with multiple test objects - const size_t objCount = 50; - std::vector<TestManageable *> tmv; - - for (size_t i = 0; i < objCount; i++) { - std::stringstream key; - key << "testobj-" << i; - TestManageable *tm = new TestManageable(agent, key.str()); - if (tm->GetManagementObject()->writePropertiesSize()) {} - agent->addObject(tm->GetManagementObject(), key.str()); - tmv.push_back(tm); - } + // make sure we get the correct # of deleted objects + BOOST_CHECK_EQUAL(countDels, delCount); - // wait for the objects to be published - Message m1; - uint32_t msgCount = 0; - while(r1.fetch(m1, Duration::SECOND * 6)) { - TestObjectVector objs; - decodeV2ObjectUpdates(m1, objs); - msgCount += objs.size(); - } + // verify there are no deleted objects to export now. - BOOST_CHECK_EQUAL(msgCount, objCount); + agent->exportDeletedObjects( delObjs ); + BOOST_CHECK(delObjs.size() == 0); - // destroy some of the objects, then immediately export (before the next poll cycle) + r1.close(); + delete fix; +} - uint32_t delCount = 0; - for (size_t i = 0; i < objCount; i += 2) { - tmv[i]->GetManagementObject()->resourceDestroy(); - delCount++; - } +// Verify that we can export and import multiple deleted objects correctly. +// QMF V2 variant +QPID_AUTO_TEST_CASE(v2ImportMultiDelObj) +{ + AgentFixture* fix = new AgentFixture(3, true); + management::ManagementAgent* agent; + agent = fix->getBrokerAgent(); + + Receiver r1 = fix->createV2DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); + + // populate the agent with multiple test objects + const size_t objCount = 50; + std::vector<TestManageable *> tmv; + + for (size_t i = 0; i < objCount; i++) { + std::stringstream key; + key << "testobj-" << i; + TestManageable *tm = new TestManageable(agent, key.str()); + if (tm->GetManagementObject()->writePropertiesSize()) {} + agent->addObject(tm->GetManagementObject(), key.str()); + tmv.push_back(tm); + } - ::qpid::management::ManagementAgent::DeletedObjectList delObjs; - agent->exportDeletedObjects( delObjs ); - BOOST_CHECK_EQUAL(delObjs.size(), delCount); + // wait for the objects to be published + Message m1; + uint32_t msgCount = 0; + while(r1.fetch(m1, Duration::SECOND * 6)) { + TestObjectVector objs; + decodeV2ObjectUpdates(m1, objs); + msgCount += objs.size(); + } - // destroy the broker, and reinistantiate a new one without populating it - // with TestObjects. + BOOST_CHECK_EQUAL(msgCount, objCount); - r1.close(); - delete fix; - while (tmv.size()) { - delete tmv.back(); - tmv.pop_back(); - } + // destroy some of the objects, then immediately export (before the next poll cycle) - fix = new AgentFixture(3, true); - r1 = fix->createV2DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); - agent = fix->getBrokerAgent(); - agent->importDeletedObjects( delObjs ); + uint32_t delCount = 0; + for (size_t i = 0; i < objCount; i += 2) { + tmv[i]->GetManagementObject()->resourceDestroy(); + delCount++; + } - // wait for the deleted object to be published, verify the count + ::qpid::management::ManagementAgent::DeletedObjectList delObjs; + agent->exportDeletedObjects( delObjs ); + BOOST_CHECK_EQUAL(delObjs.size(), delCount); - uint32_t countDels = 0; - while (r1.fetch(m1, Duration::SECOND * 6)) { - TestObjectVector objs; - decodeV2ObjectUpdates(m1, objs); - BOOST_CHECK(objs.size() > 0); + // destroy the broker, and reinistantiate a new one without populating it + // with TestObjects. - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + r1.close(); + delete fix; + while (tmv.size()) { + delete tmv.back(); + tmv.pop_back(); + } - TestManageable::validateTestObjectProperties(**oIter); + fix = new AgentFixture(3, true); + r1 = fix->createV2DataIndRcvr("org.apache.qpid.broker.mgmt.test", "#"); + agent = fix->getBrokerAgent(); + agent->importDeletedObjects( delObjs ); - qpid::types::Variant::Map mappy; - (*oIter)->writeTimestamps(mappy); - if (mappy["_delete_ts"].asUint64() != 0) - countDels++; - } - } + // wait for the deleted object to be published, verify the count - // make sure we get the correct # of deleted objects - BOOST_CHECK_EQUAL(countDels, delCount); + uint32_t countDels = 0; + while (r1.fetch(m1, Duration::SECOND * 6)) { + TestObjectVector objs; + decodeV2ObjectUpdates(m1, objs); + BOOST_CHECK(objs.size() > 0); - // verify there are no deleted objects to export now. + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - agent->exportDeletedObjects( delObjs ); - BOOST_CHECK(delObjs.size() == 0); + TestManageable::validateTestObjectProperties(**oIter); - r1.close(); - delete fix; + qpid::types::Variant::Map mappy; + (*oIter)->writeTimestamps(mappy); + if (mappy["_delete_ts"].asUint64() != 0) + countDels++; } + } - // See QPID-2997 - QPID_AUTO_TEST_CASE(v2RapidRestoreObj) - { - AgentFixture* fix = new AgentFixture(3, true); - management::ManagementAgent* agent; - agent = fix->getBrokerAgent(); - - // two objects, same ObjID - TestManageable *tm1 = new TestManageable(agent, std::string("obj2")); - TestManageable *tm2 = new TestManageable(agent, std::string("obj2")); - - Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObject()->getPackageName(), "#"); - - // add, then immediately delete and re-add a copy of the object - agent->addObject(tm1->GetManagementObject(), "testobj-1"); - tm1->GetManagementObject()->resourceDestroy(); - agent->addObject(tm2->GetManagementObject(), "testobj-1"); - - // expect: a delete notification, then an update notification - TestObjectVector objs; - bool isDeleted = false; - bool isAdvertised = false; - size_t count = 0; - Message m1; - while (r1.fetch(m1, Duration::SECOND * 6)) { - - decodeV2ObjectUpdates(m1, objs); - BOOST_CHECK(objs.size() > 0); - - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - count++; - TestManageable::validateTestObjectProperties(**oIter); - - qpid::types::Variant::Map mappy; - (*oIter)->writeTimestamps(mappy); - if (mappy["_delete_ts"].asUint64() != 0) { - isDeleted = true; - BOOST_CHECK(isAdvertised == false); // delete must be first - } else { - isAdvertised = true; - BOOST_CHECK(isDeleted == true); // delete must be first - } - } - } + // make sure we get the correct # of deleted objects + BOOST_CHECK_EQUAL(countDels, delCount); - BOOST_CHECK(isDeleted); - BOOST_CHECK(isAdvertised); - BOOST_CHECK(count == 2); + // verify there are no deleted objects to export now. - r1.close(); - delete fix; - delete tm1; - delete tm2; - } + agent->exportDeletedObjects( delObjs ); + BOOST_CHECK(delObjs.size() == 0); - // See QPID-2997 - QPID_AUTO_TEST_CASE(v2DuplicateErrorObj) - { - AgentFixture* fix = new AgentFixture(3, true); - management::ManagementAgent* agent; - agent = fix->getBrokerAgent(); - - // turn off the expected error log message - qpid::log::Options logOpts; - logOpts.selectors.clear(); - logOpts.selectors.push_back("critical+"); - qpid::log::Logger::instance().configure(logOpts); - - // two objects, same ObjID - TestManageable *tm1 = new TestManageable(agent, std::string("obj2")); - TestManageable *tm2 = new TestManageable(agent, std::string("obj2")); - // Keep a pointer to the ManagementObject. This test simulates a user-caused error - // case (duplicate objects) where the broker has no choice but to leak a management - // object (safest assumption). To prevent valgrind from flagging this leak, we - // manually clean up the object at the end of the test. - management::ManagementObject *save = tm2->GetManagementObject(); - - Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObject()->getPackageName(), "#"); - - // add, then immediately delete and re-add a copy of the object - agent->addObject(tm1->GetManagementObject(), "testobj-1"); - agent->addObject(tm2->GetManagementObject(), "testobj-1"); - - TestObjectVector objs; - size_t count = 0; - Message m1; - while (r1.fetch(m1, Duration::SECOND * 6)) { - - decodeV2ObjectUpdates(m1, objs); - BOOST_CHECK(objs.size() > 0); - - for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { - count++; - TestManageable::validateTestObjectProperties(**oIter); - } + r1.close(); + delete fix; +} + +// See QPID-2997 +QPID_AUTO_TEST_CASE(v2RapidRestoreObj) +{ + AgentFixture* fix = new AgentFixture(3, true); + management::ManagementAgent* agent; + agent = fix->getBrokerAgent(); + + // two objects, same ObjID + TestManageable *tm1 = new TestManageable(agent, std::string("obj2")); + TestManageable *tm2 = new TestManageable(agent, std::string("obj2")); + + Receiver r1 = fix->createV2DataIndRcvr(tm1->GetManagementObject()->getPackageName(), "#"); + + // add, then immediately delete and re-add a copy of the object + agent->addObject(tm1->GetManagementObject(), "testobj-1"); + tm1->GetManagementObject()->resourceDestroy(); + agent->addObject(tm2->GetManagementObject(), "testobj-1"); + + // expect: a delete notification, then an update notification + TestObjectVector objs; + bool isDeleted = false; + bool isAdvertised = false; + size_t count = 0; + Message m1; + while (r1.fetch(m1, Duration::SECOND * 6)) { + + decodeV2ObjectUpdates(m1, objs); + BOOST_CHECK(objs.size() > 0); + + for (TestObjectVector::iterator oIter = objs.begin(); oIter != objs.end(); oIter++) { + count++; + TestManageable::validateTestObjectProperties(**oIter); + + qpid::types::Variant::Map mappy; + (*oIter)->writeTimestamps(mappy); + if (mappy["_delete_ts"].asUint64() != 0) { + isDeleted = true; + BOOST_CHECK(isAdvertised == false); // delete must be first + } else { + isAdvertised = true; + BOOST_CHECK(isDeleted == true); // delete must be first } + } + } - BOOST_CHECK(count == 1); // only one should be accepted. + BOOST_CHECK(isDeleted); + BOOST_CHECK(isAdvertised); + BOOST_CHECK(count == 2); - r1.close(); - delete fix; - delete tm1; - delete tm2; - delete save; - } + r1.close(); + delete fix; + delete tm1; + delete tm2; +} - QPID_AUTO_TEST_SUITE_END() - } +QPID_AUTO_TEST_SUITE_END() +} } diff --git a/qpid/cpp/src/tests/testagent.cpp b/qpid/cpp/src/tests/testagent.cpp index e6010a8e00..d538a8181c 100644 --- a/qpid/cpp/src/tests/testagent.cpp +++ b/qpid/cpp/src/tests/testagent.cpp @@ -59,7 +59,7 @@ class CoreClass : public Manageable { string name; ManagementAgent* agent; - _qmf::Parent* mgmtObject; + _qmf::Parent::shared_ptr mgmtObject; std::vector<ChildClass*> children; Mutex vectorLock; @@ -68,7 +68,7 @@ public: CoreClass(ManagementAgent* agent, string _name); ~CoreClass() { mgmtObject->resourceDestroy(); } - ManagementObject* GetManagementObject(void) const + ManagementObject::shared_ptr GetManagementObject(void) const { return mgmtObject; } void doLoop(); @@ -78,14 +78,14 @@ public: class ChildClass : public Manageable { string name; - _qmf::Child* mgmtObject; + _qmf::Child::shared_ptr mgmtObject; public: ChildClass(ManagementAgent* agent, CoreClass* parent, string name); ~ChildClass() { mgmtObject->resourceDestroy(); } - ManagementObject* GetManagementObject(void) const + ManagementObject::shared_ptr GetManagementObject(void) const { return mgmtObject; } void doWork() @@ -97,9 +97,9 @@ public: CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent(_agent) { static uint64_t persistId = 0x111222333444555LL; - mgmtObject = new _qmf::Parent(agent, this, name); + mgmtObject = _qmf::Parent::shared_ptr(new _qmf::Parent(agent, this, name)); - agent->addObject(mgmtObject, persistId++); + agent->addObject(mgmtObject.get(), persistId++); mgmtObject->set_state("IDLE"); } @@ -146,9 +146,9 @@ Manageable::status_t CoreClass::ManagementMethod(uint32_t methodId, Args& args, ChildClass::ChildClass(ManagementAgent* agent, CoreClass* parent, string name) { - mgmtObject = new _qmf::Child(agent, this, parent, name); + mgmtObject = _qmf::Child::shared_ptr(new _qmf::Child(agent, this, parent, name)); - agent->addObject(mgmtObject); + agent->addObject(mgmtObject.get()); } |
