summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/ManagementObjectQueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/ManagementObjectQueue.h')
-rw-r--r--cpp/src/qpid/broker/ManagementObjectQueue.h179
1 files changed, 179 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/ManagementObjectQueue.h b/cpp/src/qpid/broker/ManagementObjectQueue.h
new file mode 100644
index 0000000000..989d10f8c0
--- /dev/null
+++ b/cpp/src/qpid/broker/ManagementObjectQueue.h
@@ -0,0 +1,179 @@
+#ifndef _ManagementObjectQueue_
+#define _ManagementObjectQueue_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementObject.h"
+
+namespace qpid {
+namespace broker {
+
+const uint32_t MSG_MASK_TX = 1; // Transactional message
+const uint32_t MSG_MASK_PERSIST = 2; // Persistent message
+
+class ManagementObjectQueue : public ManagementObject
+{
+ private:
+
+ static bool schemaNeeded;
+
+ std::string objectName;
+ std::string name;
+ bool durable;
+ bool autoDelete;
+
+ uint64_t msgTotalEnqueues; // Total messages enqueued
+ uint64_t msgTotalDequeues; // Total messages dequeued
+ uint64_t msgTxEnqueues; // Transactional messages enqueued
+ uint64_t msgTxDequeues; // Transactional messages dequeued
+ uint64_t msgPersistEnqueues; // Persistent messages enqueued
+ uint64_t msgPersistDequeues; // Persistent messages dequeued
+
+ uint32_t msgDepth; // Current size of queue in messages
+ uint32_t msgDepthLow; // Low-water queue size, this interval
+ uint32_t msgDepthHigh; // High-water queue size, this interval
+
+ uint64_t byteTotalEnqueues; // Total messages enqueued
+ uint64_t byteTotalDequeues; // Total messages dequeued
+ uint64_t byteTxEnqueues; // Transactional messages enqueued
+ uint64_t byteTxDequeues; // Transactional messages dequeued
+ uint64_t bytePersistEnqueues; // Persistent messages enqueued
+ uint64_t bytePersistDequeues; // Persistent messages dequeued
+
+ uint32_t byteDepth; // Current size of queue in bytes
+ uint32_t byteDepthLow; // Low-water mark this interval
+ uint32_t byteDepthHigh; // High-water mark this interval
+
+ uint64_t enqueueTxStarts; // Total enqueue transactions started
+ uint64_t enqueueTxCommits; // Total enqueue transactions committed
+ uint64_t enqueueTxRejects; // Total enqueue transactions rejected
+
+ uint32_t enqueueTxCount; // Current pending enqueue transactions
+ uint32_t enqueueTxCountLow; // Low water mark this interval
+ uint32_t enqueueTxCountHigh; // High water mark this interval
+
+ uint64_t dequeueTxStarts; // Total dequeue transactions started
+ uint64_t dequeueTxCommits; // Total dequeue transactions committed
+ uint64_t dequeueTxRejects; // Total dequeue transactions rejected
+
+ uint32_t dequeueTxCount; // Current pending dequeue transactions
+ uint32_t dequeueTxCountLow; // Low water mark this interval
+ uint32_t dequeueTxCountHigh; // High water mark this interval
+
+ uint32_t consumers; // Current consumers on queue
+ uint32_t consumersLow; // Low water mark this interval
+ uint32_t consumersHigh; // High water mark this interval
+
+ uint16_t getObjectType (void) { return OBJECT_QUEUE; }
+ std::string getObjectName (void) { return objectName; }
+ void writeSchema (Buffer& buf);
+ void writeConfig (Buffer& buf);
+ void writeInstrumentation (Buffer& buf);
+ bool getSchemaNeeded (void) { return schemaNeeded; }
+
+ inline void adjustQueueHiLo (void){
+ if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth;
+ if (msgDepth < msgDepthLow) msgDepthLow = msgDepth;
+
+ if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth;
+ if (byteDepth < byteDepthLow) byteDepthLow = byteDepth;
+ instChanged = true;
+ }
+
+ inline void adjustTxHiLo (void){
+ if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = enqueueTxCount;
+ if (enqueueTxCount < enqueueTxCountLow) enqueueTxCountLow = enqueueTxCount;
+ if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = dequeueTxCount;
+ if (dequeueTxCount < dequeueTxCountLow) dequeueTxCountLow = dequeueTxCount;
+ instChanged = true;
+ }
+
+ inline void adjustConsumerHiLo (void){
+ if (consumers > consumersHigh) consumersHigh = consumers;
+ if (consumers < consumersLow) consumersLow = consumers;
+ instChanged = true;
+ }
+
+ public:
+
+ typedef boost::shared_ptr<ManagementObjectQueue> shared_ptr;
+
+ ManagementObjectQueue (std::string& name, bool durable, bool autoDelete);
+ ~ManagementObjectQueue (void);
+
+ // The following mask contents are used to describe enqueued or dequeued
+ // messages when counting statistics.
+
+ inline void enqueue (uint64_t bytes, uint32_t attrMask = 0){
+ msgTotalEnqueues++;
+ byteTotalEnqueues += bytes;
+
+ if (attrMask & MSG_MASK_TX){
+ msgTxEnqueues++;
+ byteTxEnqueues += bytes;
+ }
+
+ if (attrMask & MSG_MASK_PERSIST){
+ msgPersistEnqueues++;
+ bytePersistEnqueues += bytes;
+ }
+
+ msgDepth++;
+ byteDepth += bytes;
+ adjustQueueHiLo ();
+ }
+
+ inline void dequeue (uint64_t bytes, uint32_t attrMask = 0){
+ msgTotalDequeues++;
+ byteTotalDequeues += bytes;
+
+ if (attrMask & MSG_MASK_TX){
+ msgTxDequeues++;
+ byteTxDequeues += bytes;
+ }
+
+ if (attrMask & MSG_MASK_PERSIST){
+ msgPersistDequeues++;
+ bytePersistDequeues += bytes;
+ }
+
+ msgDepth--;
+ byteDepth -= bytes;
+ adjustQueueHiLo ();
+ }
+
+ inline void incConsumers (void){
+ consumers++;
+ adjustConsumerHiLo ();
+ }
+
+ inline void decConsumers (void){
+ consumers--;
+ adjustConsumerHiLo ();
+ }
+};
+
+}}
+
+
+
+#endif /*!_ManagementObjectQueue_*/