summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/management
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2007-11-13 00:34:09 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2007-11-13 00:34:09 +0000
commitb4a562164bfbf627c9bf9e802ea2baa33d12521a (patch)
treee3e1d0ed46174cf61e15569659c97a3c93ac6b97 /cpp/src/qpid/management
parent6128b62ed47c825dba3f7a36ccdb60b55044ea2e (diff)
downloadqpid-python-b4a562164bfbf627c9bf9e802ea2baa33d12521a.tar.gz
Patch QPID-680 from tross
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@594364 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management')
-rw-r--r--cpp/src/qpid/management/Args.h39
-rw-r--r--cpp/src/qpid/management/ArgsBrokerEcho.h38
-rw-r--r--cpp/src/qpid/management/Broker.cpp126
-rw-r--r--cpp/src/qpid/management/Broker.h76
-rw-r--r--cpp/src/qpid/management/Manageable.h67
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp325
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h81
-rw-r--r--cpp/src/qpid/management/ManagementExchange.cpp77
-rw-r--r--cpp/src/qpid/management/ManagementExchange.h61
-rw-r--r--cpp/src/qpid/management/ManagementObject.cpp61
-rw-r--r--cpp/src/qpid/management/ManagementObject.h125
-rw-r--r--cpp/src/qpid/management/Queue.cpp189
-rw-r--r--cpp/src/qpid/management/Queue.h184
-rw-r--r--cpp/src/qpid/management/Vhost.cpp57
-rw-r--r--cpp/src/qpid/management/Vhost.h65
15 files changed, 1571 insertions, 0 deletions
diff --git a/cpp/src/qpid/management/Args.h b/cpp/src/qpid/management/Args.h
new file mode 100644
index 0000000000..75d0b4dd70
--- /dev/null
+++ b/cpp/src/qpid/management/Args.h
@@ -0,0 +1,39 @@
+#ifndef _Args_
+#define _Args_
+
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+
+namespace qpid {
+namespace management {
+
+class Args
+{
+ public:
+
+ virtual ~Args (void) = 0;
+
+};
+
+inline Args::~Args (void) {}
+
+}}
+
+#endif /*!_Args_*/
diff --git a/cpp/src/qpid/management/ArgsBrokerEcho.h b/cpp/src/qpid/management/ArgsBrokerEcho.h
new file mode 100644
index 0000000000..ad9d7e0813
--- /dev/null
+++ b/cpp/src/qpid/management/ArgsBrokerEcho.h
@@ -0,0 +1,38 @@
+#ifndef _ArgsBrokerEcho_
+#define _ArgsBrokerEcho_
+
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "Args.h"
+#include <string>
+
+namespace qpid {
+namespace management {
+
+class ArgsBrokerEcho : public Args
+{
+ public:
+ uint32_t io_sequence;
+ std::string io_body;
+};
+
+}}
+
+#endif /*!_ArgsBrokerEcho_*/
diff --git a/cpp/src/qpid/management/Broker.cpp b/cpp/src/qpid/management/Broker.cpp
new file mode 100644
index 0000000000..8626654c43
--- /dev/null
+++ b/cpp/src/qpid/management/Broker.cpp
@@ -0,0 +1,126 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "config.h"
+#include "qpid/broker/Broker.h"
+#include "Broker.h"
+#include "ArgsBrokerEcho.h"
+
+using namespace qpid::management;
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+bool Broker::schemaNeeded = true;
+
+Broker::Broker (Manageable* _core, const Options& _conf) :
+ ManagementObject (_core)
+{
+ broker::Broker::Options& conf = (broker::Broker::Options&) _conf;
+
+ sysId = "sysId";
+ port = conf.port;
+ workerThreads = conf.workerThreads;
+ maxConns = conf.maxConnections;
+ connBacklog = conf.connectionBacklog;
+ stagingThreshold = conf.stagingThreshold;
+ storeLib = conf.store;
+ asyncStore = conf.storeAsync;
+ mgmtPubInterval = conf.mgmtPubInterval;
+ initialDiskPageSize = 0;
+ initialPagesPerQueue = 0;
+ clusterName = "";
+ version = PACKAGE_VERSION;
+}
+
+Broker::~Broker () {}
+
+void Broker::writeSchema (Buffer& buf)
+{
+ schemaNeeded = false;
+
+ schemaListBegin (buf);
+ schemaItem (buf, TYPE_UINT32, "systemRef", "System ID", true, true);
+ schemaItem (buf, TYPE_UINT16, "port", "TCP Port for AMQP Service", true, true);
+ schemaItem (buf, TYPE_UINT16, "workerThreads", "Thread pool size", true);
+ schemaItem (buf, TYPE_UINT16, "maxConns", "Maximum allowed connections", true);
+ schemaItem (buf, TYPE_UINT16, "connBacklog",
+ "Connection backlog limit for listening socket", true);
+ schemaItem (buf, TYPE_UINT32, "stagingThreshold",
+ "Broker stages messages over this size to disk", true);
+ schemaItem (buf, TYPE_STRING, "storeLib", "Name of persistent storage library", true);
+ schemaItem (buf, TYPE_UINT8, "asyncStore", "Use async persistent store", true);
+ schemaItem (buf, TYPE_UINT16, "mgmtPubInterval", "Interval for management broadcasts", true);
+ schemaItem (buf, TYPE_UINT32, "initialDiskPageSize",
+ "Number of disk pages allocated for storage", true);
+ schemaItem (buf, TYPE_UINT32, "initialPagesPerQueue",
+ "Number of disk pages allocated per queue", true);
+ schemaItem (buf, TYPE_STRING, "clusterName",
+ "Name of cluster this server is a member of, zero-length for standalone server", true);
+ schemaItem (buf, TYPE_STRING, "version", "Running software version", true);
+ schemaListEnd (buf);
+}
+
+void Broker::writeConfig (Buffer& buf)
+{
+ configChanged = false;
+
+ writeTimestamps (buf);
+ buf.putLong (0);
+ buf.putShort (port);
+ buf.putShort (workerThreads);
+ buf.putShort (maxConns);
+ buf.putShort (connBacklog);
+ buf.putLong (stagingThreshold);
+ buf.putShortString (storeLib);
+ buf.putOctet (asyncStore ? 1 : 0);
+ buf.putShort (mgmtPubInterval);
+ buf.putLong (initialDiskPageSize);
+ buf.putLong (initialPagesPerQueue);
+ buf.putShortString (clusterName);
+ buf.putShortString (version);
+}
+
+void Broker::doMethod (string methodName,
+ Buffer& inBuf,
+ Buffer& outBuf)
+{
+ if (methodName.compare ("echo") == 0)
+ {
+ ArgsBrokerEcho args;
+ uint32_t result;
+
+ args.io_sequence = inBuf.getLong ();
+ inBuf.getLongString (args.io_body);
+
+ result = coreObject->ManagementMethod (1, args);
+
+ outBuf.putLong (result);
+ outBuf.putShortString ("OK");
+ outBuf.putLong (args.io_sequence);
+ outBuf.putLongString (args.io_body);
+ }
+ else
+ {
+ outBuf.putLong (1);
+ outBuf.putShortString ("Unknown Method");
+ }
+}
+
diff --git a/cpp/src/qpid/management/Broker.h b/cpp/src/qpid/management/Broker.h
new file mode 100644
index 0000000000..91fddd3724
--- /dev/null
+++ b/cpp/src/qpid/management/Broker.h
@@ -0,0 +1,76 @@
+#ifndef _ManagementBroker_
+#define _ManagementBroker_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementObject.h"
+#include "qpid/Options.h"
+#include "boost/shared_ptr.hpp"
+
+namespace qpid {
+namespace management {
+
+class Broker : public ManagementObject
+{
+ public:
+
+ typedef boost::shared_ptr<Broker> shared_ptr;
+
+ Broker (Manageable* coreObject, const Options& conf);
+ ~Broker (void);
+
+ private:
+
+ static bool schemaNeeded;
+
+ std::string sysId;
+ uint16_t port;
+ uint16_t workerThreads;
+ uint16_t maxConns;
+ uint16_t connBacklog;
+ uint32_t stagingThreshold;
+ std::string storeLib;
+ bool asyncStore;
+ uint16_t mgmtPubInterval;
+ uint32_t initialDiskPageSize;
+ uint32_t initialPagesPerQueue;
+ std::string clusterName;
+ std::string version;
+
+ uint16_t getObjectType (void) { return OBJECT_BROKER; }
+ std::string getObjectName (void) { return "broker"; }
+ void writeSchema (qpid::framing::Buffer& buf);
+ void writeConfig (qpid::framing::Buffer& buf);
+ void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {}
+ bool getSchemaNeeded (void) { return schemaNeeded; }
+ void setSchemaNeeded (void) { schemaNeeded = true; }
+ void doMethod (std::string methodName,
+ qpid::framing::Buffer& inBuf,
+ qpid::framing::Buffer& outBuf);
+
+ inline bool getInstChanged (void) { return false; }
+};
+
+}}
+
+
+#endif /*!_ManagementBroker_*/
diff --git a/cpp/src/qpid/management/Manageable.h b/cpp/src/qpid/management/Manageable.h
new file mode 100644
index 0000000000..7c9b49be9a
--- /dev/null
+++ b/cpp/src/qpid/management/Manageable.h
@@ -0,0 +1,67 @@
+#ifndef _Manageable_
+#define _Manageable_
+
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "ManagementObject.h"
+#include "Args.h"
+#include "qpid/sys/Time.h"
+#include <qpid/framing/Buffer.h>
+#include <boost/shared_ptr.hpp>
+#include <map>
+
+namespace qpid {
+namespace management {
+
+class Manageable
+{
+ public:
+
+ virtual ~Manageable (void) = 0;
+
+ // status_t is a type used to pass completion status from the method handler.
+ //
+ typedef uint32_t status_t;
+
+ static const status_t STATUS_OK = 0;
+ static const status_t STATUS_UNKNOWN_OBJECT = 1;
+ static const status_t STATUS_UNKNOWN_METHOD = 2;
+
+ // Every "Manageable" object must hold a reference to exactly one
+ // management object. This object is always of a class derived from
+ // the pure-virtual "ManagementObject".
+ //
+ // This accessor function returns a shared_ptr to the management object.
+ //
+ virtual ManagementObject::shared_ptr GetManagementObject (void) const = 0;
+
+ // Every "Manageable" object must implement ManagementMethod. This
+ // function is called when a remote management client invokes a method
+ // on this object. The input and output arguments are specific to the
+ // method being called and must be down-cast to the appropriate sub class
+ // before use.
+ virtual status_t ManagementMethod (uint32_t methodId, Args& args) = 0;
+};
+
+inline Manageable::~Manageable (void) {}
+
+}}
+
+#endif /*!_Manageable_*/
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
new file mode 100644
index 0000000000..b5b4da09b8
--- /dev/null
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -0,0 +1,325 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementAgent.h"
+#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/log/Statement.h"
+#include <qpid/broker/Message.h>
+#include <qpid/broker/MessageDelivery.h>
+#include <qpid/framing/AMQFrame.h>
+#include <list>
+
+using namespace qpid::framing;
+using namespace qpid::management;
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+ManagementAgent::shared_ptr ManagementAgent::agent;
+
+ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval)
+{
+ timer.add (TimerTask::shared_ptr (new Periodic(*this, interval)));
+ nextObjectId = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
+}
+
+ManagementAgent::shared_ptr ManagementAgent::getAgent (void)
+{
+ if (agent.get () == 0)
+ agent = shared_ptr (new ManagementAgent (10));
+
+ return agent;
+}
+
+void ManagementAgent::setExchange (Exchange::shared_ptr _mexchange,
+ Exchange::shared_ptr _dexchange)
+{
+ mExchange = _mexchange;
+ dExchange = _dexchange;
+}
+
+void ManagementAgent::addObject (ManagementObject::shared_ptr object)
+{
+ uint64_t objectId = nextObjectId++;
+
+ object->setObjectId (objectId);
+ managementObjects[objectId] = object;
+}
+
+ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
+ : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {}
+
+void ManagementAgent::Periodic::fire ()
+{
+ agent.timer.add (TimerTask::shared_ptr (new Periodic (agent, agent.interval)));
+ agent.PeriodicProcessing ();
+}
+
+void ManagementAgent::clientAdded (void)
+{
+ for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++)
+ {
+ ManagementObject::shared_ptr object = iter->second;
+ object->setAllChanged ();
+ object->setSchemaNeeded ();
+ }
+}
+
+void ManagementAgent::PeriodicProcessing (void)
+{
+#define BUFSIZE 65536
+#define THRESHOLD 16384
+ char msgChars[BUFSIZE];
+ Buffer msgBuffer (msgChars, BUFSIZE);
+ uint32_t contentSize;
+ std::list<uint64_t> deleteList;
+
+ if (managementObjects.empty ())
+ return;
+
+ Message::shared_ptr msg (new Message ());
+
+ // Build the magic number for the management message.
+ msgBuffer.putOctet ('A');
+ msgBuffer.putOctet ('M');
+ msgBuffer.putOctet ('0');
+ msgBuffer.putOctet ('1');
+
+ for (ManagementObjectMap::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++)
+ {
+ ManagementObject::shared_ptr object = iter->second;
+
+ if (object->getSchemaNeeded ())
+ {
+ uint32_t startAvail = msgBuffer.available ();
+ uint32_t recordLength;
+
+ msgBuffer.putOctet ('S'); // opcode = Schema Record
+ msgBuffer.putOctet (0); // content-class = N/A
+ msgBuffer.putShort (object->getObjectType ());
+ msgBuffer.record (); // Record the position of the length field
+ msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length
+
+ object->writeSchema (msgBuffer);
+ recordLength = startAvail - msgBuffer.available ();
+ msgBuffer.restore (true); // Restore pointer to length field
+ msgBuffer.putLong (recordLength);
+ msgBuffer.restore (); // Re-restore to get to the end of the buffer
+ }
+
+ if (object->getConfigChanged ())
+ {
+ uint32_t startAvail = msgBuffer.available ();
+ uint32_t recordLength;
+
+ msgBuffer.putOctet ('C'); // opcode = Content Record
+ msgBuffer.putOctet ('C'); // content-class = Configuration
+ msgBuffer.putShort (object->getObjectType ());
+ msgBuffer.record (); // Record the position of the length field
+ msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length
+
+ object->writeConfig (msgBuffer);
+ recordLength = startAvail - msgBuffer.available ();
+ msgBuffer.restore (true); // Restore pointer to length field
+ msgBuffer.putLong (recordLength);
+ msgBuffer.restore (); // Re-restore to get to the end of the buffer
+ }
+
+ if (object->getInstChanged ())
+ {
+ uint32_t startAvail = msgBuffer.available ();
+ uint32_t recordLength;
+
+ msgBuffer.putOctet ('C'); // opcode = Content Record
+ msgBuffer.putOctet ('I'); // content-class = Instrumentation
+ msgBuffer.putShort (object->getObjectType ());
+ msgBuffer.record (); // Record the position of the length field
+ msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length
+
+ object->writeInstrumentation (msgBuffer);
+ recordLength = startAvail - msgBuffer.available ();
+ msgBuffer.restore (true); // Restore pointer to length field
+ msgBuffer.putLong (recordLength);
+ msgBuffer.restore (); // Re-restore to get to the end of the buffer
+ }
+
+ if (object->isDeleted ())
+ deleteList.push_back (iter->first);
+
+ // Temporary protection against buffer overrun.
+ // This needs to be replaced with frame fragmentation.
+ if (msgBuffer.available () < THRESHOLD)
+ break;
+ }
+
+ msgBuffer.putOctet ('X'); // End-of-message
+ msgBuffer.putOctet (0);
+ msgBuffer.putShort (0);
+ msgBuffer.putLong (8);
+
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+
+ AMQFrame method (0, MessageTransferBody(ProtocolVersion(),
+ 0, "qpid.management", 0, 0));
+ AMQFrame header (0, AMQHeaderBody());
+ AMQFrame content;
+
+ content.setBody(AMQContentBody());
+ content.castBody<AMQContentBody>()->decode(msgBuffer, contentSize);
+
+ method.setEof (false);
+ header.setBof (false);
+ header.setEof (false);
+ content.setBof (false);
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+
+ MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setContentLength(contentSize);
+ msg->getFrames().append(content);
+
+ DeliverableMessage deliverable (msg);
+ mExchange->route (deliverable, "mgmt", 0);
+
+ // Delete flagged objects
+ for (std::list<uint64_t>::reverse_iterator iter = deleteList.rbegin ();
+ iter != deleteList.rend ();
+ iter++)
+ {
+ managementObjects.erase (*iter);
+ }
+ deleteList.clear ();
+}
+
+void ManagementAgent::dispatchCommand (Deliverable& deliverable,
+ const string& routingKey,
+ const FieldTable* /*args*/)
+{
+ size_t pos, start;
+ Message& msg = ((DeliverableMessage&) deliverable).getMessage ();
+ uint32_t contentSize;
+
+ if (routingKey.compare (0, 7, "method.") != 0)
+ {
+ QPID_LOG (debug, "Illegal routing key for dispatch: " << routingKey);
+ return;
+ }
+
+ start = 7;
+ if (routingKey.length () == start)
+ {
+ QPID_LOG (debug, "Missing package-name in routing key: " << routingKey);
+ return;
+ }
+
+ pos = routingKey.find ('.', start);
+ if (pos == string::npos || routingKey.length () == pos + 1)
+ {
+ QPID_LOG (debug, "Missing class-name in routing key: " << routingKey);
+ return;
+ }
+
+ string packageName = routingKey.substr (start, pos - start);
+
+ start = pos + 1;
+ pos = routingKey.find ('.', start);
+ if (pos == string::npos || routingKey.length () == pos + 1)
+ {
+ QPID_LOG (debug, "Missing method-name in routing key: " << routingKey);
+ return;
+ }
+
+ string className = routingKey.substr (start, pos - start);
+
+ start = pos + 1;
+ string methodName = routingKey.substr (start, routingKey.length () - start);
+
+ QPID_LOG (debug, "Dispatch package: " << packageName << ", class: "
+ << className << ", method: " << methodName);
+
+ contentSize = msg.encodedContentSize ();
+ if (contentSize < 8 || contentSize > 65536)
+ return;
+
+ char *inMem = new char[contentSize];
+ char outMem[4096]; // TODO Fix This
+ Buffer inBuffer (inMem, contentSize);
+ Buffer outBuffer (outMem, 4096);
+
+ msg.encodeContent (inBuffer);
+ inBuffer.reset ();
+
+ uint32_t methodId = inBuffer.getLong ();
+ uint64_t objId = inBuffer.getLongLong ();
+ string replyTo;
+
+ inBuffer.getShortString (replyTo);
+
+ QPID_LOG (debug, " len = " << contentSize << ", methodId = " <<
+ methodId << ", objId = " << objId);
+
+ outBuffer.putLong (methodId);
+
+ ManagementObjectMap::iterator iter = managementObjects.find (objId);
+ if (iter == managementObjects.end ())
+ {
+ outBuffer.putLong (2);
+ outBuffer.putShortString ("Invalid Object Id");
+ }
+ else
+ {
+ iter->second->doMethod (methodName, inBuffer, outBuffer);
+ }
+
+ Message::shared_ptr outMsg (new Message ());
+ uint32_t msgSize = 4096 - outBuffer.available ();
+ outBuffer.reset ();
+ AMQFrame method (0, MessageTransferBody(ProtocolVersion(),
+ 0, "amq.direct", 0, 0));
+ AMQFrame header (0, AMQHeaderBody());
+ AMQFrame content;
+
+ content.setBody(AMQContentBody());
+ content.castBody<AMQContentBody>()->decode(outBuffer, msgSize);
+
+ method.setEof (false);
+ header.setBof (false);
+ header.setEof (false);
+ content.setBof (false);
+
+ outMsg->getFrames().append(method);
+ outMsg->getFrames().append(header);
+
+ MessageProperties* props = outMsg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setContentLength(msgSize);
+ outMsg->getFrames().append(content);
+
+ DeliverableMessage outDeliverable (outMsg);
+ dExchange->route (outDeliverable, replyTo, 0);
+
+ free (inMem);
+}
+
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
new file mode 100644
index 0000000000..e84c0478f3
--- /dev/null
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -0,0 +1,81 @@
+#ifndef _ManagementAgent_
+#define _ManagementAgent_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Options.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Timer.h"
+#include "ManagementObject.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace management {
+
+class ManagementAgent
+{
+ private:
+
+ ManagementAgent (uint16_t interval);
+
+ public:
+
+ typedef boost::shared_ptr<ManagementAgent> shared_ptr;
+
+ static shared_ptr getAgent (void);
+
+ void setInterval (uint16_t _interval) { interval = _interval; }
+ void setExchange (broker::Exchange::shared_ptr mgmtExchange,
+ broker::Exchange::shared_ptr directExchange);
+ void addObject (ManagementObject::shared_ptr object);
+ void clientAdded (void);
+ void dispatchCommand (broker::Deliverable& msg,
+ const std::string& routingKey,
+ const qpid::framing::FieldTable* args);
+
+ private:
+
+ struct Periodic : public broker::TimerTask
+ {
+ ManagementAgent& agent;
+
+ Periodic (ManagementAgent& agent, uint32_t seconds);
+ ~Periodic () {}
+ void fire ();
+ };
+
+ static shared_ptr agent;
+ ManagementObjectMap managementObjects;
+ broker::Timer timer;
+ broker::Exchange::shared_ptr mExchange;
+ broker::Exchange::shared_ptr dExchange;
+ uint16_t interval;
+ uint64_t nextObjectId;
+
+ void PeriodicProcessing (void);
+};
+
+}}
+
+
+
+#endif /*!_ManagementAgent_*/
diff --git a/cpp/src/qpid/management/ManagementExchange.cpp b/cpp/src/qpid/management/ManagementExchange.cpp
new file mode 100644
index 0000000000..f18b6fc402
--- /dev/null
+++ b/cpp/src/qpid/management/ManagementExchange.cpp
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementExchange.h"
+#include "qpid/log/Statement.h"
+
+using namespace qpid::management;
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+ManagementExchange::ManagementExchange (const string& _name) :
+ Exchange (_name), TopicExchange(_name) {}
+ManagementExchange::ManagementExchange (const std::string& _name,
+ bool _durable,
+ const FieldTable& _args) :
+ Exchange (_name, _durable, _args),
+ TopicExchange(_name, _durable, _args) {}
+
+
+bool ManagementExchange::bind (Queue::shared_ptr queue,
+ const string& routingKey,
+ const FieldTable* args)
+{
+ bool result = TopicExchange::bind (queue, routingKey, args);
+
+ // Notify the management agent that a new management client has bound to the
+ // exchange.
+ if (result)
+ managementAgent->clientAdded ();
+
+ return result;
+}
+
+void ManagementExchange::route (Deliverable& msg,
+ const string& routingKey,
+ const FieldTable* args)
+{
+ // Intercept management commands
+ if (routingKey.length () > 7 &&
+ routingKey.substr (0, 7).compare ("method.") == 0)
+ {
+ managementAgent->dispatchCommand (msg, routingKey, args);
+ return;
+ }
+
+ TopicExchange::route (msg, routingKey, args);
+}
+
+void ManagementExchange::setManagmentAgent (ManagementAgent::shared_ptr agent)
+{
+ managementAgent = agent;
+}
+
+
+ManagementExchange::~ManagementExchange() {}
+
+const std::string ManagementExchange::typeName("management");
+
diff --git a/cpp/src/qpid/management/ManagementExchange.h b/cpp/src/qpid/management/ManagementExchange.h
new file mode 100644
index 0000000000..6ccdf47182
--- /dev/null
+++ b/cpp/src/qpid/management/ManagementExchange.h
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _ManagementExchange_
+#define _ManagementExchange_
+
+#include "qpid/broker/TopicExchange.h"
+#include "ManagementAgent.h"
+
+namespace qpid {
+namespace broker {
+
+class ManagementExchange : public virtual TopicExchange
+{
+ private:
+ management::ManagementAgent::shared_ptr managementAgent;
+
+ public:
+ static const std::string typeName;
+
+ ManagementExchange (const string& name);
+ ManagementExchange (const string& _name, bool _durable,
+ const qpid::framing::FieldTable& _args);
+
+ virtual std::string getType() const { return typeName; }
+
+ virtual bool bind (Queue::shared_ptr queue,
+ const string& routingKey,
+ const qpid::framing::FieldTable* args);
+
+ virtual void route (Deliverable& msg,
+ const string& routingKey,
+ const qpid::framing::FieldTable* args);
+
+ void setManagmentAgent (management::ManagementAgent::shared_ptr agent);
+
+ virtual ~ManagementExchange();
+};
+
+
+}
+}
+
+#endif
diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp
new file mode 100644
index 0000000000..24588b4edd
--- /dev/null
+++ b/cpp/src/qpid/management/ManagementObject.cpp
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Manageable.h"
+#include "ManagementObject.h"
+
+using namespace qpid::framing;
+using namespace qpid::management;
+using namespace qpid::sys;
+
+void ManagementObject::schemaItem (Buffer& buf,
+ uint8_t typeCode,
+ std::string name,
+ std::string description,
+ bool isConfig,
+ bool isIndex)
+{
+ uint8_t flags =
+ (isConfig ? FLAG_CONFIG : 0) | (isIndex ? FLAG_INDEX : 0);
+
+ buf.putOctet (flags);
+ buf.putOctet (typeCode);
+ buf.putShortString (name);
+ buf.putShortString (description);
+}
+
+void ManagementObject::schemaListBegin (Buffer& buf)
+{
+ schemaItem (buf, TYPE_UINT64, "id", "Object ID", true, true);
+}
+
+void ManagementObject::schemaListEnd (Buffer& buf)
+{
+ buf.putOctet (FLAG_END);
+}
+
+void ManagementObject::writeTimestamps (Buffer& buf)
+{
+ buf.putLongLong (uint64_t (Duration (now ())));
+ buf.putLongLong (createTime);
+ buf.putLongLong (destroyTime);
+ buf.putLongLong (objectId);
+}
diff --git a/cpp/src/qpid/management/ManagementObject.h b/cpp/src/qpid/management/ManagementObject.h
new file mode 100644
index 0000000000..416a477796
--- /dev/null
+++ b/cpp/src/qpid/management/ManagementObject.h
@@ -0,0 +1,125 @@
+#ifndef _ManagementObject_
+#define _ManagementObject_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Manageable.h"
+#include "qpid/sys/Time.h"
+#include <qpid/framing/Buffer.h>
+#include <boost/shared_ptr.hpp>
+#include <map>
+
+namespace qpid {
+namespace management {
+
+const uint16_t OBJECT_SYSTEM = 1;
+const uint16_t OBJECT_BROKER = 2;
+const uint16_t OBJECT_VHOST = 3;
+const uint16_t OBJECT_QUEUE = 4;
+const uint16_t OBJECT_EXCHANGE = 5;
+const uint16_t OBJECT_BINDING = 6;
+const uint16_t OBJECT_CLIENT = 7;
+const uint16_t OBJECT_SESSION = 8;
+const uint16_t OBJECT_DESTINATION = 9;
+const uint16_t OBJECT_PRODUCER = 10;
+const uint16_t OBJECT_CONSUMER = 11;
+
+class Manageable;
+
+class ManagementObject
+{
+ protected:
+
+ uint64_t createTime;
+ uint64_t destroyTime;
+ uint64_t objectId;
+ bool configChanged;
+ bool instChanged;
+ bool deleted;
+ Manageable* coreObject;
+
+ static const uint8_t TYPE_UINT8 = 1;
+ static const uint8_t TYPE_UINT16 = 2;
+ static const uint8_t TYPE_UINT32 = 3;
+ static const uint8_t TYPE_UINT64 = 4;
+ static const uint8_t TYPE_BOOL = 5;
+ static const uint8_t TYPE_STRING = 6;
+
+ static const uint8_t FLAG_CONFIG = 0x01;
+ static const uint8_t FLAG_INDEX = 0x02;
+ static const uint8_t FLAG_END = 0x80;
+
+ void schemaItem (qpid::framing::Buffer& buf,
+ uint8_t typeCode,
+ std::string name,
+ std::string description,
+ bool isConfig = false,
+ bool isIndex = false);
+ void schemaListBegin (qpid::framing::Buffer& buf);
+ void schemaListEnd (qpid::framing::Buffer& buf);
+ void writeTimestamps (qpid::framing::Buffer& buf);
+
+ public:
+ typedef boost::shared_ptr<ManagementObject> shared_ptr;
+
+ ManagementObject (Manageable* _core) :
+ destroyTime(0), objectId (0), configChanged(true),
+ instChanged(true), deleted(false), coreObject(_core)
+ { createTime = uint64_t (qpid::sys::Duration (qpid::sys::now ())); }
+ virtual ~ManagementObject () {}
+
+ virtual uint16_t getObjectType (void) = 0;
+ virtual std::string getObjectName (void) = 0;
+ virtual void writeSchema (qpid::framing::Buffer& buf) = 0;
+ virtual void writeConfig (qpid::framing::Buffer& buf) = 0;
+ virtual void writeInstrumentation (qpid::framing::Buffer& buf) = 0;
+ virtual bool getSchemaNeeded (void) = 0;
+ virtual void setSchemaNeeded (void) = 0;
+ virtual void doMethod (std::string methodName,
+ qpid::framing::Buffer& inBuf,
+ qpid::framing::Buffer& outBuf) = 0;
+
+ void setObjectId (uint64_t oid) { objectId = oid; }
+ uint64_t getObjectId (void) { return objectId; }
+ inline bool getConfigChanged (void) { return configChanged; }
+ virtual bool getInstChanged (void) { return instChanged; }
+ inline void setAllChanged (void)
+ {
+ configChanged = true;
+ instChanged = true;
+ }
+
+ inline void resourceDestroy (void) {
+ destroyTime = uint64_t (qpid::sys::Duration (qpid::sys::now ()));
+ deleted = true;
+ }
+ bool isDeleted (void) { return deleted; }
+
+};
+
+ typedef std::map<uint64_t,ManagementObject::shared_ptr> ManagementObjectMap;
+
+}}
+
+
+
+#endif /*!_ManagementObject_*/
diff --git a/cpp/src/qpid/management/Queue.cpp b/cpp/src/qpid/management/Queue.cpp
new file mode 100644
index 0000000000..3c82877ebd
--- /dev/null
+++ b/cpp/src/qpid/management/Queue.cpp
@@ -0,0 +1,189 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/log/Statement.h"
+#include "Manageable.h"
+#include "Queue.h"
+
+using namespace qpid::management;
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+bool Queue::schemaNeeded = true;
+
+Queue::Queue (Manageable* _core, Manageable* _parent,
+ const std::string& _name,
+ bool _durable, bool _autoDelete) :
+ ManagementObject(_core), name(_name),
+ durable(_durable), autoDelete(_autoDelete)
+{
+ vhostRef = _parent->GetManagementObject ()->getObjectId ();
+
+ msgTotalEnqueues = 0;
+ msgTotalDequeues = 0;
+ msgTxEnqueues = 0;
+ msgTxDequeues = 0;
+ msgPersistEnqueues = 0;
+ msgPersistDequeues = 0;
+
+ msgDepth = 0;
+ msgDepthLow = 0;
+ msgDepthHigh = 0;
+
+ byteTotalEnqueues = 0;
+ byteTotalDequeues = 0;
+ byteTxEnqueues = 0;
+ byteTxDequeues = 0;
+ bytePersistEnqueues = 0;
+ bytePersistDequeues = 0;
+
+ byteDepth = 0;
+ byteDepthLow = 0;
+ byteDepthHigh = 0;
+
+ enqueueTxStarts = 0;
+ enqueueTxCommits = 0;
+ enqueueTxRejects = 0;
+ dequeueTxStarts = 0;
+ dequeueTxCommits = 0;
+ dequeueTxRejects = 0;
+
+ enqueueTxCount = 0;
+ enqueueTxCountLow = 0;
+ enqueueTxCountHigh = 0;
+
+ dequeueTxCount = 0;
+ dequeueTxCountLow = 0;
+ dequeueTxCountHigh = 0;
+
+ consumers = 0;
+ consumersLow = 0;
+ consumersHigh = 0;
+}
+
+Queue::~Queue () {}
+
+void Queue::writeSchema (Buffer& buf)
+{
+ schemaNeeded = false;
+
+ schemaListBegin (buf);
+ schemaItem (buf, TYPE_UINT64, "vhostRef", "Virtual Host Ref", true);
+ schemaItem (buf, TYPE_STRING, "name", "Queue Name", true);
+ schemaItem (buf, TYPE_BOOL, "durable", "Durable", true);
+ schemaItem (buf, TYPE_BOOL, "autoDelete", "AutoDelete", true);
+ schemaItem (buf, TYPE_UINT64, "msgTotalEnqueues", "Total messages enqueued");
+ schemaItem (buf, TYPE_UINT64, "msgTotalDequeues", "Total messages dequeued");
+ schemaItem (buf, TYPE_UINT64, "msgTxnEnqueues", "Transactional messages enqueued");
+ schemaItem (buf, TYPE_UINT64, "msgTxnDequeues", "Transactional messages dequeued");
+ schemaItem (buf, TYPE_UINT64, "msgPersistEnqueues", "Persistent messages enqueued");
+ schemaItem (buf, TYPE_UINT64, "msgPersistDequeues", "Persistent messages dequeued");
+ schemaItem (buf, TYPE_UINT32, "msgDepth", "Current size of queue in messages");
+ schemaItem (buf, TYPE_UINT32, "msgDepthLow", "Low-water queue size, this interval");
+ schemaItem (buf, TYPE_UINT32, "msgDepthHigh", "High-water queue size, this interval");
+ schemaItem (buf, TYPE_UINT64, "byteTotalEnqueues", "Total messages enqueued");
+ schemaItem (buf, TYPE_UINT64, "byteTotalDequeues", "Total messages dequeued");
+ schemaItem (buf, TYPE_UINT64, "byteTxnEnqueues", "Transactional messages enqueued");
+ schemaItem (buf, TYPE_UINT64, "byteTxnDequeues", "Transactional messages dequeued");
+ schemaItem (buf, TYPE_UINT64, "bytePersistEnqueues", "Persistent messages enqueued");
+ schemaItem (buf, TYPE_UINT64, "bytePersistDequeues", "Persistent messages dequeued");
+ schemaItem (buf, TYPE_UINT32, "byteDepth", "Current size of queue in bytes");
+ schemaItem (buf, TYPE_UINT32, "byteDepthLow", "Low-water mark this interval");
+ schemaItem (buf, TYPE_UINT32, "byteDepthHigh", "High-water mark this interval");
+ schemaItem (buf, TYPE_UINT64, "enqueueTxnStarts", "Total enqueue transactions started ");
+ schemaItem (buf, TYPE_UINT64, "enqueueTxnCommits", "Total enqueue transactions committed");
+ schemaItem (buf, TYPE_UINT64, "enqueueTxnRejects", "Total enqueue transactions rejected");
+ schemaItem (buf, TYPE_UINT32, "enqueueTxnCount", "Current pending enqueue transactions");
+ schemaItem (buf, TYPE_UINT32, "enqueueTxnCountLow", "Low water mark this interval");
+ schemaItem (buf, TYPE_UINT32, "enqueueTxnCountHigh", "High water mark this interval");
+ schemaItem (buf, TYPE_UINT64, "dequeueTxnStarts", "Total dequeue transactions started ");
+ schemaItem (buf, TYPE_UINT64, "dequeueTxnCommits", "Total dequeue transactions committed");
+ schemaItem (buf, TYPE_UINT64, "dequeueTxnRejects", "Total dequeue transactions rejected");
+ schemaItem (buf, TYPE_UINT32, "dequeueTxnCount", "Current pending dequeue transactions");
+ schemaItem (buf, TYPE_UINT32, "dequeueTxnCountLow", "Transaction low water mark this interval");
+ schemaItem (buf, TYPE_UINT32, "dequeueTxnCountHigh", "Transaction high water mark this interval");
+ schemaItem (buf, TYPE_UINT32, "consumers", "Current consumers on queue");
+ schemaItem (buf, TYPE_UINT32, "consumersLow", "Consumer low water mark this interval");
+ schemaItem (buf, TYPE_UINT32, "consumersHigh", "Consumer high water mark this interval");
+ schemaListEnd (buf);
+}
+
+void Queue::writeConfig (Buffer& buf)
+{
+ configChanged = false;
+
+ writeTimestamps (buf);
+ buf.putLongLong (vhostRef);
+ buf.putShortString (name);
+ buf.putOctet (durable ? 1 : 0);
+ buf.putOctet (autoDelete ? 1 : 0);
+}
+
+void Queue::writeInstrumentation (Buffer& buf)
+{
+ instChanged = false;
+
+ writeTimestamps (buf);
+ buf.putLongLong (msgTotalEnqueues);
+ buf.putLongLong (msgTotalDequeues);
+ buf.putLongLong (msgTxEnqueues);
+ buf.putLongLong (msgTxDequeues);
+ buf.putLongLong (msgPersistEnqueues);
+ buf.putLongLong (msgPersistDequeues);
+ buf.putLong (msgDepth);
+ buf.putLong (msgDepthLow);
+ buf.putLong (msgDepthHigh);
+ buf.putLongLong (byteTotalEnqueues);
+ buf.putLongLong (byteTotalDequeues);
+ buf.putLongLong (byteTxEnqueues);
+ buf.putLongLong (byteTxDequeues);
+ buf.putLongLong (bytePersistEnqueues);
+ buf.putLongLong (bytePersistDequeues);
+ buf.putLong (byteDepth);
+ buf.putLong (byteDepthLow);
+ buf.putLong (byteDepthHigh);
+ buf.putLongLong (enqueueTxStarts);
+ buf.putLongLong (enqueueTxCommits);
+ buf.putLongLong (enqueueTxRejects);
+ buf.putLong (enqueueTxCount);
+ buf.putLong (enqueueTxCountLow);
+ buf.putLong (enqueueTxCountHigh);
+ buf.putLongLong (dequeueTxStarts);
+ buf.putLongLong (dequeueTxCommits);
+ buf.putLongLong (dequeueTxRejects);
+ buf.putLong (dequeueTxCount);
+ buf.putLong (dequeueTxCountLow);
+ buf.putLong (dequeueTxCountHigh);
+ buf.putLong (consumers);
+ buf.putLong (consumersLow);
+ buf.putLong (consumersHigh);
+
+ msgDepthLow = msgDepth;
+ msgDepthHigh = msgDepth;
+ byteDepthLow = byteDepth;
+ byteDepthHigh = byteDepth;
+ enqueueTxCountLow = enqueueTxCount;
+ enqueueTxCountHigh = enqueueTxCount;
+ dequeueTxCountLow = dequeueTxCount;
+ dequeueTxCountHigh = dequeueTxCount;
+ consumersLow = consumers;
+ consumersHigh = consumers;
+}
diff --git a/cpp/src/qpid/management/Queue.h b/cpp/src/qpid/management/Queue.h
new file mode 100644
index 0000000000..3a7fdf5263
--- /dev/null
+++ b/cpp/src/qpid/management/Queue.h
@@ -0,0 +1,184 @@
+#ifndef _ManagementQueue_
+#define _ManagementQueue_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementObject.h"
+
+namespace qpid {
+namespace management {
+
+const uint32_t MSG_MASK_TX = 1; // Transactional message
+const uint32_t MSG_MASK_PERSIST = 2; // Persistent message
+
+class Queue : public ManagementObject
+{
+ private:
+
+ static bool schemaNeeded;
+
+ uint64_t vhostRef;
+ std::string name;
+ bool durable;
+ bool autoDelete;
+
+ uint64_t msgTotalEnqueues; // Total messages enqueued
+ uint64_t msgTotalDequeues; // Total messages dequeued
+ uint64_t msgTxEnqueues; // Transactional messages enqueued
+ uint64_t msgTxDequeues; // Transactional messages dequeued
+ uint64_t msgPersistEnqueues; // Persistent messages enqueued
+ uint64_t msgPersistDequeues; // Persistent messages dequeued
+
+ uint32_t msgDepth; // Current size of queue in messages
+ uint32_t msgDepthLow; // Low-water queue size, this interval
+ uint32_t msgDepthHigh; // High-water queue size, this interval
+
+ uint64_t byteTotalEnqueues; // Total messages enqueued
+ uint64_t byteTotalDequeues; // Total messages dequeued
+ uint64_t byteTxEnqueues; // Transactional messages enqueued
+ uint64_t byteTxDequeues; // Transactional messages dequeued
+ uint64_t bytePersistEnqueues; // Persistent messages enqueued
+ uint64_t bytePersistDequeues; // Persistent messages dequeued
+
+ uint32_t byteDepth; // Current size of queue in bytes
+ uint32_t byteDepthLow; // Low-water mark this interval
+ uint32_t byteDepthHigh; // High-water mark this interval
+
+ uint64_t enqueueTxStarts; // Total enqueue transactions started
+ uint64_t enqueueTxCommits; // Total enqueue transactions committed
+ uint64_t enqueueTxRejects; // Total enqueue transactions rejected
+
+ uint32_t enqueueTxCount; // Current pending enqueue transactions
+ uint32_t enqueueTxCountLow; // Low water mark this interval
+ uint32_t enqueueTxCountHigh; // High water mark this interval
+
+ uint64_t dequeueTxStarts; // Total dequeue transactions started
+ uint64_t dequeueTxCommits; // Total dequeue transactions committed
+ uint64_t dequeueTxRejects; // Total dequeue transactions rejected
+
+ uint32_t dequeueTxCount; // Current pending dequeue transactions
+ uint32_t dequeueTxCountLow; // Low water mark this interval
+ uint32_t dequeueTxCountHigh; // High water mark this interval
+
+ uint32_t consumers; // Current consumers on queue
+ uint32_t consumersLow; // Low water mark this interval
+ uint32_t consumersHigh; // High water mark this interval
+
+ uint16_t getObjectType (void) { return OBJECT_QUEUE; }
+ std::string getObjectName (void) { return "queue"; }
+ void writeSchema (qpid::framing::Buffer& buf);
+ void writeConfig (qpid::framing::Buffer& buf);
+ void writeInstrumentation (qpid::framing::Buffer& buf);
+ bool getSchemaNeeded (void) { return schemaNeeded; }
+ void setSchemaNeeded (void) { schemaNeeded = true; }
+ void doMethod (std::string /*methodName*/,
+ qpid::framing::Buffer& /*inBuf*/,
+ qpid::framing::Buffer& /*outBuf*/) {}
+
+ inline void adjustQueueHiLo (void){
+ if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth;
+ if (msgDepth < msgDepthLow) msgDepthLow = msgDepth;
+
+ if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth;
+ if (byteDepth < byteDepthLow) byteDepthLow = byteDepth;
+ instChanged = true;
+ }
+
+ inline void adjustTxHiLo (void){
+ if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = enqueueTxCount;
+ if (enqueueTxCount < enqueueTxCountLow) enqueueTxCountLow = enqueueTxCount;
+ if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = dequeueTxCount;
+ if (dequeueTxCount < dequeueTxCountLow) dequeueTxCountLow = dequeueTxCount;
+ instChanged = true;
+ }
+
+ inline void adjustConsumerHiLo (void){
+ if (consumers > consumersHigh) consumersHigh = consumers;
+ if (consumers < consumersLow) consumersLow = consumers;
+ instChanged = true;
+ }
+
+ public:
+
+ typedef boost::shared_ptr<Queue> shared_ptr;
+
+ Queue (Manageable* coreObject, Manageable* parentObject,
+ const std::string& name, bool durable, bool autoDelete);
+ ~Queue (void);
+
+ // The following mask contents are used to describe enqueued or dequeued
+ // messages when counting statistics.
+
+ inline void enqueue (uint64_t bytes, uint32_t attrMask = 0){
+ msgTotalEnqueues++;
+ byteTotalEnqueues += bytes;
+
+ if (attrMask & MSG_MASK_TX){
+ msgTxEnqueues++;
+ byteTxEnqueues += bytes;
+ }
+
+ if (attrMask & MSG_MASK_PERSIST){
+ msgPersistEnqueues++;
+ bytePersistEnqueues += bytes;
+ }
+
+ msgDepth++;
+ byteDepth += bytes;
+ adjustQueueHiLo ();
+ }
+
+ inline void dequeue (uint64_t bytes, uint32_t attrMask = 0){
+ msgTotalDequeues++;
+ byteTotalDequeues += bytes;
+
+ if (attrMask & MSG_MASK_TX){
+ msgTxDequeues++;
+ byteTxDequeues += bytes;
+ }
+
+ if (attrMask & MSG_MASK_PERSIST){
+ msgPersistDequeues++;
+ bytePersistDequeues += bytes;
+ }
+
+ msgDepth--;
+ byteDepth -= bytes;
+ adjustQueueHiLo ();
+ }
+
+ inline void incConsumers (void){
+ consumers++;
+ adjustConsumerHiLo ();
+ }
+
+ inline void decConsumers (void){
+ consumers--;
+ adjustConsumerHiLo ();
+ }
+};
+
+}}
+
+
+
+#endif /*!_ManagementQueue_*/
diff --git a/cpp/src/qpid/management/Vhost.cpp b/cpp/src/qpid/management/Vhost.cpp
new file mode 100644
index 0000000000..effcb1599c
--- /dev/null
+++ b/cpp/src/qpid/management/Vhost.cpp
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Manageable.h"
+#include "Vhost.h"
+
+using namespace qpid::management;
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+bool Vhost::schemaNeeded = true;
+
+Vhost::Vhost (Manageable* _core, Manageable* _parent) :
+ ManagementObject (_core), name("/")
+{
+ brokerRef = _parent->GetManagementObject ()->getObjectId ();
+}
+
+Vhost::~Vhost () {}
+
+void Vhost::writeSchema (Buffer& buf)
+{
+ schemaNeeded = false;
+
+ schemaListBegin (buf);
+ schemaItem (buf, TYPE_UINT64, "brokerRef", "Broker Reference" , true);
+ schemaItem (buf, TYPE_STRING, "name", "Name of virtual host", true);
+ schemaListEnd (buf);
+}
+
+void Vhost::writeConfig (Buffer& buf)
+{
+ configChanged = false;
+
+ writeTimestamps (buf);
+ buf.putLongLong (brokerRef);
+ buf.putShortString (name);
+}
+
diff --git a/cpp/src/qpid/management/Vhost.h b/cpp/src/qpid/management/Vhost.h
new file mode 100644
index 0000000000..5fc5a2870b
--- /dev/null
+++ b/cpp/src/qpid/management/Vhost.h
@@ -0,0 +1,65 @@
+#ifndef _ManagementVhost_
+#define _ManagementVhost_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Manageable.h"
+#include "ManagementObject.h"
+#include "boost/shared_ptr.hpp"
+
+namespace qpid {
+namespace management {
+
+class Vhost : public ManagementObject
+{
+ public:
+
+ typedef boost::shared_ptr<Vhost> shared_ptr;
+
+ Vhost (Manageable* coreObject, Manageable* parentObject);
+ ~Vhost (void);
+
+ private:
+
+ static bool schemaNeeded;
+
+ uint64_t brokerRef;
+ std::string name;
+
+ uint16_t getObjectType (void) { return OBJECT_VHOST; }
+ std::string getObjectName (void) { return "vhost"; }
+ void writeSchema (qpid::framing::Buffer& buf);
+ void writeConfig (qpid::framing::Buffer& buf);
+ void writeInstrumentation (qpid::framing::Buffer& /*buf*/) {}
+ bool getSchemaNeeded (void) { return schemaNeeded; }
+ void setSchemaNeeded (void) { schemaNeeded = true; }
+ void doMethod (std::string /*methodName*/,
+ qpid::framing::Buffer& /*inBuf*/,
+ qpid::framing::Buffer& /*outBuf*/) {}
+
+ inline bool getInstChanged (void) { return false; }
+};
+
+}}
+
+
+#endif /*!_ManagementVhost_*/