summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/management/Broker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/management/Broker.cpp')
-rw-r--r--cpp/src/qpid/management/Broker.cpp176
1 files changed, 152 insertions, 24 deletions
diff --git a/cpp/src/qpid/management/Broker.cpp b/cpp/src/qpid/management/Broker.cpp
index 8626654c43..2c27512669 100644
--- a/cpp/src/qpid/management/Broker.cpp
+++ b/cpp/src/qpid/management/Broker.cpp
@@ -21,6 +21,7 @@
#include "config.h"
#include "qpid/broker/Broker.h"
+#include "qpid/framing/FieldTable.h"
#include "Broker.h"
#include "ArgsBrokerEcho.h"
@@ -31,7 +32,7 @@ using namespace qpid::framing;
bool Broker::schemaNeeded = true;
Broker::Broker (Manageable* _core, const Options& _conf) :
- ManagementObject (_core)
+ ManagementObject (_core, "broker")
{
broker::Broker::Options& conf = (broker::Broker::Options&) _conf;
@@ -54,28 +55,149 @@ Broker::~Broker () {}
void Broker::writeSchema (Buffer& buf)
{
+ FieldTable ft;
+ FieldTable arg;
+
schemaNeeded = false;
- schemaListBegin (buf);
- schemaItem (buf, TYPE_UINT32, "systemRef", "System ID", true, true);
- schemaItem (buf, TYPE_UINT16, "port", "TCP Port for AMQP Service", true, true);
- schemaItem (buf, TYPE_UINT16, "workerThreads", "Thread pool size", true);
- schemaItem (buf, TYPE_UINT16, "maxConns", "Maximum allowed connections", true);
- schemaItem (buf, TYPE_UINT16, "connBacklog",
- "Connection backlog limit for listening socket", true);
- schemaItem (buf, TYPE_UINT32, "stagingThreshold",
- "Broker stages messages over this size to disk", true);
- schemaItem (buf, TYPE_STRING, "storeLib", "Name of persistent storage library", true);
- schemaItem (buf, TYPE_UINT8, "asyncStore", "Use async persistent store", true);
- schemaItem (buf, TYPE_UINT16, "mgmtPubInterval", "Interval for management broadcasts", true);
- schemaItem (buf, TYPE_UINT32, "initialDiskPageSize",
- "Number of disk pages allocated for storage", true);
- schemaItem (buf, TYPE_UINT32, "initialPagesPerQueue",
- "Number of disk pages allocated per queue", true);
- schemaItem (buf, TYPE_STRING, "clusterName",
- "Name of cluster this server is a member of, zero-length for standalone server", true);
- schemaItem (buf, TYPE_STRING, "version", "Running software version", true);
- schemaListEnd (buf);
+ // Schema class header:
+ buf.putShortString (className); // Class Name
+ buf.putShort (13); // Config Element Count
+ buf.putShort (0); // Inst Element Count
+ buf.putShort (1); // Method Count
+ buf.putShort (0); // Event Count
+
+ // Config Elements
+ ft = FieldTable ();
+ ft.setString ("name", "systemRef");
+ ft.setInt ("type", TYPE_U64);
+ ft.setInt ("access", ACCESS_RC);
+ ft.setInt ("index", 1);
+ ft.setString ("desc", "System ID");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "port");
+ ft.setInt ("type", TYPE_U16);
+ ft.setInt ("access", ACCESS_RC);
+ ft.setInt ("index", 1);
+ ft.setString ("desc", "TCP Port for AMQP Service");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "workerThreads");
+ ft.setInt ("type", TYPE_U16);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Thread pool size");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "maxConns");
+ ft.setInt ("type", TYPE_U16);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Maximum allowed connections");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "connBacklog");
+ ft.setInt ("type", TYPE_U16);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Connection backlog limit for listening socket");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "stagingThreshold");
+ ft.setInt ("type", TYPE_U32);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Broker stages messages over this size to disk");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "storeLib");
+ ft.setInt ("type", TYPE_SSTR);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Name of persistent storage library");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "asyncStore");
+ ft.setInt ("type", TYPE_U8);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Use async persistent store");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "mgmtPubInterval");
+ ft.setInt ("type", TYPE_U16);
+ ft.setInt ("access", ACCESS_RW);
+ ft.setInt ("index", 0);
+ ft.setInt ("min", 1);
+ ft.setString ("unit", "second");
+ ft.setString ("desc", "Interval for management broadcasts");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "initialDiskPageSize");
+ ft.setInt ("type", TYPE_U32);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Number of disk pages allocated for storage");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "initialPagesPerQueue");
+ ft.setInt ("type", TYPE_U32);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Number of disk pages allocated per queue");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "clusterName");
+ ft.setInt ("type", TYPE_SSTR);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Name of cluster this server is a member of, zero-length for standalone server");
+ buf.put (ft);
+
+ ft = FieldTable ();
+ ft.setString ("name", "version");
+ ft.setInt ("type", TYPE_SSTR);
+ ft.setInt ("access", ACCESS_RO);
+ ft.setInt ("index", 0);
+ ft.setString ("desc", "Running software version");
+ buf.put (ft);
+
+ // Inst Elements
+
+ return; // TODO - Remove
+
+ // Methods
+ ft = FieldTable ();
+ ft.setString ("name", "echo");
+ ft.setInt ("args", 2);
+
+ arg = FieldTable ();
+ arg.setString ("name", "sequence");
+ arg.setInt ("type", TYPE_U32);
+ arg.setInt ("dir", DIR_IO);
+ ft.setTable ("arg", arg);
+
+ arg = FieldTable ();
+ arg.setString ("name", "body");
+ arg.setInt ("type", TYPE_LSTR);
+ arg.setInt ("dir", DIR_IO);
+ ft.setTable ("arg", arg);
+
+ buf.put (ft);
+
+ // Events
}
void Broker::writeConfig (Buffer& buf)
@@ -83,7 +205,7 @@ void Broker::writeConfig (Buffer& buf)
configChanged = false;
writeTimestamps (buf);
- buf.putLong (0);
+ buf.putLongLong (0);
buf.putShort (port);
buf.putShort (workerThreads);
buf.putShort (maxConns);
@@ -99,8 +221,8 @@ void Broker::writeConfig (Buffer& buf)
}
void Broker::doMethod (string methodName,
- Buffer& inBuf,
- Buffer& outBuf)
+ Buffer& inBuf,
+ Buffer& outBuf)
{
if (methodName.compare ("echo") == 0)
{
@@ -117,6 +239,12 @@ void Broker::doMethod (string methodName,
outBuf.putLong (args.io_sequence);
outBuf.putLongString (args.io_body);
}
+
+ // TODO - Remove this method prior to beta
+ else if (methodName.compare ("crash") == 0)
+ {
+ assert (0);
+ }
else
{
outBuf.putLong (1);