summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp9
-rw-r--r--cpp/src/qpid/broker/Broker.h1
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp4
-rw-r--r--cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp9
-rw-r--r--cpp/src/qpid/broker/System.cpp3
7 files changed, 19 insertions, 11 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index d94f228734..24c5a0c049 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -93,7 +93,8 @@ Broker::Options::Options(const std::string& name) :
tcpNoDelay(false),
requireEncrypted(false),
maxSessionRate(0),
- asyncQueueEvents(false) // Must be false in a cluster.
+ asyncQueueEvents(false), // Must be false in a cluster.
+ qmf2Support(false)
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -114,6 +115,7 @@ Broker::Options::Options(const std::string& name) :
("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
+ ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Use QMF v2 for Broker Management")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"),
"Interval between attempts to purge any expired messages from queues")
@@ -138,7 +140,9 @@ const std::string knownHostsNone("none");
Broker::Broker(const Broker::Options& conf) :
poller(new Poller),
config(conf),
- managementAgent(conf.enableMgmt ? new ManagementAgent() : 0),
+ managementAgent(conf.enableMgmt ? new ManagementAgent(!conf.qmf2Support,
+ conf.qmf2Support)
+ : 0),
store(new NullMessageStore),
acl(0),
dataDir(conf.noDataDir ? std::string() : conf.dataDir),
@@ -164,6 +168,7 @@ Broker::Broker(const Broker::Options& conf) :
QPID_LOG(info, "Management enabled");
managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(),
conf.mgmtPubInterval, this, conf.workerThreads + 3);
+ managementAgent->setName("apache.org", "qpidd");
_qmf::Package packageInitializer(managementAgent.get());
System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this);
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 465a17f4eb..f9be992f0c 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -113,6 +113,7 @@ public:
std::string knownHosts;
uint32_t maxSessionRate;
bool asyncQueueEvents;
+ bool qmf2Support;
private:
std::string getHome();
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 7bb70ed24a..1d3da982d8 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -149,7 +149,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
mgmtExchange = new _qmf::Exchange (agent, this, parent, _name);
mgmtExchange->set_durable(durable);
mgmtExchange->set_autoDelete(false);
- mgmtExchange->set_arguments(args);
+ mgmtExchange->set_arguments(ManagementAgent::toMap(args));
if (!durable) {
if (name.empty()) {
agent->addObject (mgmtExchange, 0x1000000000000004LL); // Special default exchange ID
@@ -336,7 +336,7 @@ void Exchange::Binding::startManagement()
{
management::ObjectId queueId = mo->getObjectId();
mgmtBinding = new _qmf::Binding
- (agent, this, (Manageable*) parent, queueId, key, args);
+ (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args));
if (!origin.empty())
mgmtBinding->set_origin(origin);
agent->addObject (mgmtBinding, agent->allocateId(this));
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 9e379dfc49..8d9248212f 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -873,7 +873,7 @@ void Queue::configure(const FieldTable& _settings, bool recovering)
if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>());
if (mgmtObject != 0)
- mgmtObject->set_arguments (_settings);
+ mgmtObject->set_arguments(ManagementAgent::toMap(_settings));
if ( isDurable() && ! getPersistenceId() && ! recovering )
store->create(*this, _settings);
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 73ef807a0a..5148d88e72 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -280,7 +280,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
if (agent != 0)
{
mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name,
- !acquire, ackExpected, exclusive ,arguments);
+ !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments));
agent->addObject (mgmtObject, agent->allocateId(this));
mgmtObject->set_creditMode("WINDOW");
}
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index c3b6f697fd..10eddc6045 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -106,7 +106,7 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type,
- alternateExchange, durable, false, args,
+ alternateExchange, durable, false, ManagementAgent::toMap(args),
response.second ? "created" : "existing"));
}catch(UnknownExchangeTypeException& /*e*/){
@@ -194,7 +194,8 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
- agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, exchangeRoutingKey, arguments));
+ agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName,
+ queueName, exchangeRoutingKey, ManagementAgent::toMap(arguments)));
}
}else{
throw NotFoundException("Bind failed. No such exchange: " + exchangeName);
@@ -389,7 +390,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
- name, durable, exclusive, autoDelete, arguments,
+ name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
queue_created.second ? "created" : "existing"));
}
@@ -499,7 +500,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(),
- queueName, destination, exclusive, arguments));
+ queueName, destination, exclusive, ManagementAgent::toMap(arguments)));
}
void
diff --git a/cpp/src/qpid/broker/System.cpp b/cpp/src/qpid/broker/System.cpp
index 455ad11cf2..90c6b13cd3 100644
--- a/cpp/src/qpid/broker/System.cpp
+++ b/cpp/src/qpid/broker/System.cpp
@@ -22,6 +22,7 @@
#include "qpid/management/ManagementAgent.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/SystemInfo.h"
+#include "qpid/types/Uuid.h"
#include <iostream>
#include <fstream>
@@ -64,7 +65,7 @@ System::System (string _dataDir, Broker* broker) : mgmtObject(0)
}
}
- mgmtObject = new _qmf::System (agent, this, systemId);
+ mgmtObject = new _qmf::System(agent, this, types::Uuid(systemId.c_array()));
std::string sysname, nodename, release, version, machine;
qpid::sys::SystemInfo::getSystemId (sysname,
nodename,