summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-11-23 13:37:42 +0000
committerAlan Conway <aconway@apache.org>2007-11-23 13:37:42 +0000
commita3aaa263858f07d37e5860136300f76fab8d7ecd (patch)
treef188402580f36e06113a92c3c74575d13040c1d0 /cpp/src/qpid/broker/Queue.cpp
parentcb070d9813e4232b4ec8409ca555b529ee5cee4b (diff)
downloadqpid-python-a3aaa263858f07d37e5860136300f76fab8d7ecd.tar.gz
QPID-689 from tross@redhat.com.
This patch introduces formal schema specification for management and code generation for management classes. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@597662 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp69
1 files changed, 46 insertions, 23 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 41a5767457..376b9367d0 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -58,7 +58,7 @@ Queue::Queue(const string& _name, bool _autodelete,
if (parent != 0)
{
mgmtObject = management::Queue::shared_ptr
- (new management::Queue (this, parent, _name, _store != 0, _autodelete));
+ (new management::Queue (this, parent, _name, _store != 0, _autodelete, 0));
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
agent->addObject (mgmtObject);
@@ -92,11 +92,21 @@ void Queue::deliver(intrusive_ptr<Message>& msg){
if (!enqueue(0, msg)){
push(msg);
msg->enqueueComplete();
- if (mgmtObject != 0)
- mgmtObject->enqueue (msg->contentSize ());
+ if (mgmtObject != 0) {
+ mgmtObject->inc_msgTotalEnqueues ();
+ mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
+ mgmtObject->inc_msgDepth ();
+ mgmtObject->inc_byteDepth (msg->contentSize ());
+ }
}else {
- if (mgmtObject != 0)
- mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST);
+ if (mgmtObject != 0) {
+ mgmtObject->inc_msgTotalEnqueues ();
+ mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
+ mgmtObject->inc_msgDepth ();
+ mgmtObject->inc_byteDepth (msg->contentSize ());
+ mgmtObject->inc_msgPersistEnqueues ();
+ mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
+ }
push(msg);
}
QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
@@ -108,8 +118,15 @@ void Queue::deliver(intrusive_ptr<Message>& msg){
void Queue::recover(intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
- if (mgmtObject != 0)
- mgmtObject->enqueue (msg->contentSize (), management::MSG_MASK_PERSIST);
+ if (mgmtObject != 0) {
+ mgmtObject->inc_msgTotalEnqueues ();
+ mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
+ mgmtObject->inc_msgPersistEnqueues ();
+ mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
+ mgmtObject->inc_msgDepth ();
+ mgmtObject->inc_byteDepth (msg->contentSize ());
+ }
+
if (store && !msg->isContentLoaded()) {
//content has not been loaded, need to ensure that lazy loading mode is set:
//TODO: find a nicer way to do this
@@ -118,15 +135,19 @@ void Queue::recover(intrusive_ptr<Message>& msg){
}
void Queue::process(intrusive_ptr<Message>& msg){
-
- uint32_t mask = management::MSG_MASK_TX;
-
- if (msg->isPersistent ())
- mask |= management::MSG_MASK_PERSIST;
-
push(msg);
- if (mgmtObject != 0)
- mgmtObject->enqueue (msg->contentSize (), mask);
+ if (mgmtObject != 0) {
+ mgmtObject->inc_msgTotalEnqueues ();
+ mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
+ mgmtObject->inc_msgTxnEnqueues ();
+ mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
+ mgmtObject->inc_msgDepth ();
+ mgmtObject->inc_byteDepth (msg->contentSize ());
+ if (msg->isPersistent ()) {
+ mgmtObject->inc_msgPersistEnqueues ();
+ mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
+ }
+ }
serializer.execute(dispatchCallback);
}
@@ -309,7 +330,7 @@ void Queue::consume(Consumer::ptr c, bool requestExclusive){
}
if (mgmtObject != 0){
- mgmtObject->incConsumers ();
+ mgmtObject->inc_consumers ();
}
}
@@ -321,7 +342,7 @@ void Queue::cancel(Consumer::ptr c){
cancel(c, browsers);
}
if (mgmtObject != 0){
- mgmtObject->decConsumers ();
+ mgmtObject->dec_consumers ();
}
if(exclusive == c) exclusive.reset();
}
@@ -341,12 +362,14 @@ QueuedMessage Queue::dequeue(){
msg = messages.front();
pop();
if (mgmtObject != 0){
- uint32_t mask = 0;
-
- if (msg.payload->isPersistent ())
- mask |= management::MSG_MASK_PERSIST;
-
- mgmtObject->dequeue (msg.payload->contentSize (), mask);
+ mgmtObject->inc_msgTotalDequeues ();
+ //mgmtObject->inc_byteTotalDequeues (msg->contentSize ());
+ mgmtObject->dec_msgDepth ();
+ //mgmtObject->dec_byteDepth (msg->contentSize ());
+ if (0){//msg->isPersistent ()) {
+ mgmtObject->inc_msgPersistDequeues ();
+ //mgmtObject->inc_bytePersistDequeues (msg->contentSize ());
+ }
}
}
return msg;