summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2007-11-13 00:34:09 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2007-11-13 00:34:09 +0000
commitb4a562164bfbf627c9bf9e802ea2baa33d12521a (patch)
treee3e1d0ed46174cf61e15569659c97a3c93ac6b97 /cpp/src/qpid/broker/Queue.cpp
parent6128b62ed47c825dba3f7a36ccdb60b55044ea2e (diff)
downloadqpid-python-b4a562164bfbf627c9bf9e802ea2baa33d12521a.tar.gz
Patch QPID-680 from tross
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@594364 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp42
1 files changed, 35 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 5745c85331..919343b152 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -36,10 +36,15 @@
using namespace qpid::broker;
using namespace qpid::sys;
using namespace qpid::framing;
+using qpid::management::ManagementAgent;
+using qpid::management::ManagementObject;
+using qpid::management::Manageable;
+using qpid::management::Args;
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
- const ConnectionToken* const _owner) :
+ const ConnectionToken* const _owner,
+ Manageable* parent) :
name(_name),
autodelete(_autodelete),
@@ -50,9 +55,21 @@ Queue::Queue(const string& _name, bool _autodelete,
serializer(false),
dispatchCallback(*this)
{
+ if (parent != 0)
+ {
+ mgmtObject = management::Queue::shared_ptr
+ (new management::Queue (this, parent, _name, _store != 0, _autodelete));
+
+ ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+ agent->addObject (mgmtObject);
+ }
}
-Queue::~Queue(){}
+Queue::~Queue()
+{
+ if (mgmtObject.get () != 0)
+ mgmtObject->resourceDestroy ();
+}
void Queue::notifyDurableIOComplete()
{
@@ -79,7 +96,7 @@ void Queue::deliver(Message::shared_ptr& msg){
mgmtObject->enqueue (msg->contentSize ());
}else {
if (mgmtObject != 0)
- mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
+ mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST);
push(msg);
}
QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
@@ -92,7 +109,7 @@ void Queue::recover(Message::shared_ptr& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
if (mgmtObject != 0)
- mgmtObject->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
+ mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST);
if (store && !msg->isContentLoaded()) {
//content has not been loaded, need to ensure that lazy loading mode is set:
//TODO: find a nicer way to do this
@@ -102,10 +119,10 @@ void Queue::recover(Message::shared_ptr& msg){
void Queue::process(Message::shared_ptr& msg){
- uint32_t mask = MSG_MASK_TX;
+ uint32_t mask = management::MSG_MASK_TX;
if (msg->isPersistent ())
- mask |= MSG_MASK_PERSIST;
+ mask |= management::MSG_MASK_PERSIST;
push(msg);
if (mgmtObject != 0)
@@ -327,7 +344,7 @@ QueuedMessage Queue::dequeue(){
uint32_t mask = 0;
if (msg.payload->isPersistent ())
- mask |= MSG_MASK_PERSIST;
+ mask |= management::MSG_MASK_PERSIST;
mgmtObject->dequeue (msg.payload->contentSize (), mask);
}
@@ -571,3 +588,14 @@ void Queue::DispatchFunctor::operator()()
if (sync) sync->completed();
}
+
+ManagementObject::shared_ptr Queue::GetManagementObject (void) const
+{
+ return dynamic_pointer_cast<ManagementObject> (mgmtObject);
+}
+
+Manageable::status_t Queue::ManagementMethod (uint32_t /*methodId*/,
+ Args& /*args*/)
+{
+ return Manageable::STATUS_OK;
+}