diff options
| author | Carl C. Trieloff <cctrieloff@apache.org> | 2007-11-13 00:34:09 +0000 |
|---|---|---|
| committer | Carl C. Trieloff <cctrieloff@apache.org> | 2007-11-13 00:34:09 +0000 |
| commit | b4a562164bfbf627c9bf9e802ea2baa33d12521a (patch) | |
| tree | e3e1d0ed46174cf61e15569659c97a3c93ac6b97 /cpp/src/qpid/broker/Queue.cpp | |
| parent | 6128b62ed47c825dba3f7a36ccdb60b55044ea2e (diff) | |
| download | qpid-python-b4a562164bfbf627c9bf9e802ea2baa33d12521a.tar.gz | |
Patch QPID-680 from tross
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@594364 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 42 |
1 files changed, 35 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 5745c85331..919343b152 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -36,10 +36,15 @@ using namespace qpid::broker; using namespace qpid::sys; using namespace qpid::framing; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, - const ConnectionToken* const _owner) : + const ConnectionToken* const _owner, + Manageable* parent) : name(_name), autodelete(_autodelete), @@ -50,9 +55,21 @@ Queue::Queue(const string& _name, bool _autodelete, serializer(false), dispatchCallback(*this) { + if (parent != 0) + { + mgmtObject = management::Queue::shared_ptr + (new management::Queue (this, parent, _name, _store != 0, _autodelete)); + + ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); + agent->addObject (mgmtObject); + } } -Queue::~Queue(){} +Queue::~Queue() +{ + if (mgmtObject.get () != 0) + mgmtObject->resourceDestroy (); +} void Queue::notifyDurableIOComplete() { @@ -79,7 +96,7 @@ void Queue::deliver(Message::shared_ptr& msg){ mgmtObject->enqueue (msg->contentSize ()); }else { if (mgmtObject != 0) - mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST); + mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST); push(msg); } QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]"); @@ -92,7 +109,7 @@ void Queue::recover(Message::shared_ptr& msg){ push(msg); msg->enqueueComplete(); // mark the message as enqueued if (mgmtObject != 0) - mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST); + mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST); if (store && !msg->isContentLoaded()) { //content has not been loaded, need to ensure that lazy loading mode is set: //TODO: find a nicer way to do this @@ -102,10 +119,10 @@ void Queue::recover(Message::shared_ptr& msg){ void Queue::process(Message::shared_ptr& msg){ - uint32_t mask = MSG_MASK_TX; + uint32_t mask = management::MSG_MASK_TX; if (msg->isPersistent ()) - mask |= MSG_MASK_PERSIST; + mask |= management::MSG_MASK_PERSIST; push(msg); if (mgmtObject != 0) @@ -327,7 +344,7 @@ QueuedMessage Queue::dequeue(){ uint32_t mask = 0; if (msg.payload->isPersistent ()) - mask |= MSG_MASK_PERSIST; + mask |= management::MSG_MASK_PERSIST; mgmtObject->dequeue (msg.payload->contentSize (), mask); } @@ -571,3 +588,14 @@ void Queue::DispatchFunctor::operator()() if (sync) sync->completed(); } + +ManagementObject::shared_ptr Queue::GetManagementObject (void) const +{ + return dynamic_pointer_cast<ManagementObject> (mgmtObject); +} + +Manageable::status_t Queue::ManagementMethod (uint32_t /*methodId*/, + Args& /*args*/) +{ + return Manageable::STATUS_OK; +} |
