diff options
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/System.cpp | 3 |
7 files changed, 19 insertions, 11 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index d94f228734..24c5a0c049 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -93,7 +93,8 @@ Broker::Options::Options(const std::string& name) : tcpNoDelay(false), requireEncrypted(false), maxSessionRate(0), - asyncQueueEvents(false) // Must be false in a cluster. + asyncQueueEvents(false), // Must be false in a cluster. + qmf2Support(false) { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -114,6 +115,7 @@ Broker::Options::Options(const std::string& name) : ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") ("staging-threshold", optValue(stagingThreshold, "N"), "Stages messages over N bytes to disk") ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") + ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Use QMF v2 for Broker Management") ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"), "Interval between attempts to purge any expired messages from queues") @@ -138,7 +140,9 @@ const std::string knownHostsNone("none"); Broker::Broker(const Broker::Options& conf) : poller(new Poller), config(conf), - managementAgent(conf.enableMgmt ? new ManagementAgent() : 0), + managementAgent(conf.enableMgmt ? new ManagementAgent(!conf.qmf2Support, + conf.qmf2Support) + : 0), store(new NullMessageStore), acl(0), dataDir(conf.noDataDir ? std::string() : conf.dataDir), @@ -164,6 +168,7 @@ Broker::Broker(const Broker::Options& conf) : QPID_LOG(info, "Management enabled"); managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(), conf.mgmtPubInterval, this, conf.workerThreads + 3); + managementAgent->setName("apache.org", "qpidd"); _qmf::Package packageInitializer(managementAgent.get()); System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 465a17f4eb..f9be992f0c 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -113,6 +113,7 @@ public: std::string knownHosts; uint32_t maxSessionRate; bool asyncQueueEvents; + bool qmf2Support; private: std::string getHome(); diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 7bb70ed24a..1d3da982d8 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -149,7 +149,7 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel mgmtExchange = new _qmf::Exchange (agent, this, parent, _name); mgmtExchange->set_durable(durable); mgmtExchange->set_autoDelete(false); - mgmtExchange->set_arguments(args); + mgmtExchange->set_arguments(ManagementAgent::toMap(args)); if (!durable) { if (name.empty()) { agent->addObject (mgmtExchange, 0x1000000000000004LL); // Special default exchange ID @@ -336,7 +336,7 @@ void Exchange::Binding::startManagement() { management::ObjectId queueId = mo->getObjectId(); mgmtBinding = new _qmf::Binding - (agent, this, (Manageable*) parent, queueId, key, args); + (agent, this, (Manageable*) parent, queueId, key, ManagementAgent::toMap(args)); if (!origin.empty()) mgmtBinding->set_origin(origin); agent->addObject (mgmtBinding, agent->allocateId(this)); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 9e379dfc49..8d9248212f 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -873,7 +873,7 @@ void Queue::configure(const FieldTable& _settings, bool recovering) if (p && p->convertsTo<std::string>()) insertSequenceNumbers(p->get<std::string>()); if (mgmtObject != 0) - mgmtObject->set_arguments (_settings); + mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); if ( isDurable() && ! getPersistenceId() && ! recovering ) store->create(*this, _settings); diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 73ef807a0a..5148d88e72 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -280,7 +280,7 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, if (agent != 0) { mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name, - !acquire, ackExpected, exclusive ,arguments); + !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)); agent->addObject (mgmtObject, agent->allocateId(this)); mgmtObject->set_creditMode("WINDOW"); } diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index c3b6f697fd..10eddc6045 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -106,7 +106,7 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), getConnection().getUserId(), exchange, type, - alternateExchange, durable, false, args, + alternateExchange, durable, false, ManagementAgent::toMap(args), response.second ? "created" : "existing")); }catch(UnknownExchangeTypeException& /*e*/){ @@ -194,7 +194,8 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) - agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, queueName, exchangeRoutingKey, arguments)); + agent->raiseEvent(_qmf::EventBind(getConnection().getUrl(), getConnection().getUserId(), exchangeName, + queueName, exchangeRoutingKey, ManagementAgent::toMap(arguments))); } }else{ throw NotFoundException("Bind failed. No such exchange: " + exchangeName); @@ -389,7 +390,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(), - name, durable, exclusive, autoDelete, arguments, + name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments), queue_created.second ? "created" : "existing")); } @@ -499,7 +500,7 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(), - queueName, destination, exclusive, arguments)); + queueName, destination, exclusive, ManagementAgent::toMap(arguments))); } void diff --git a/cpp/src/qpid/broker/System.cpp b/cpp/src/qpid/broker/System.cpp index 455ad11cf2..90c6b13cd3 100644 --- a/cpp/src/qpid/broker/System.cpp +++ b/cpp/src/qpid/broker/System.cpp @@ -22,6 +22,7 @@ #include "qpid/management/ManagementAgent.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/SystemInfo.h" +#include "qpid/types/Uuid.h" #include <iostream> #include <fstream> @@ -64,7 +65,7 @@ System::System (string _dataDir, Broker* broker) : mgmtObject(0) } } - mgmtObject = new _qmf::System (agent, this, systemId); + mgmtObject = new _qmf::System(agent, this, types::Uuid(systemId.c_array())); std::string sysname, nodename, release, version, machine; qpid::sys::SystemInfo::getSystemId (sysname, nodename, |
