diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/broker/Broker.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 286 |
1 files changed, 149 insertions, 137 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 08606516d4..49e80689fc 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -20,6 +20,8 @@ */ #include "qpid/broker/Broker.h" + +#include "qpid/broker/AclModule.h" #include "qpid/broker/AsyncResultHandle.h" #include "qpid/broker/ConfigAsyncContext.h" #include "qpid/broker/ConfigHandle.h" @@ -27,9 +29,7 @@ #include "qpid/broker/DirectExchange.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" -//#include "qpid/broker/MessageStoreModule.h" -//#include "qpid/broker/NullMessageStore.h" -//#include "qpid/broker/RecoveryAsyncContext.h" +#include "qpid/broker/NameGenerator.h" #include "qpid/broker/RecoveryManagerImpl.h" #include "qpid/broker/SaslAuthenticator.h" #include "qpid/broker/SecureConnectionFactory.h" @@ -43,6 +43,7 @@ #include "qpid/broker/MessageGroupManager.h" #include "qmf/org/apache/qpid/broker/Package.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerConnect.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerQuery.h" @@ -50,12 +51,12 @@ #include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogHiresTimestamp.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogHiresTimestamp.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" -#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" -#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" #include "qmf/org/apache/qpid/broker/EventBind.h" #include "qmf/org/apache/qpid/broker/EventUnbind.h" #include "qpid/amqp_0_10/Codecs.h" @@ -75,6 +76,7 @@ #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Time.h" +#include "qpid/sys/Timer.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/sys/TimeoutHandler.h" @@ -110,6 +112,17 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace qpid { namespace broker { +const std::string empty; +const std::string amq_direct("amq.direct"); +const std::string amq_topic("amq.topic"); +const std::string amq_fanout("amq.fanout"); +const std::string amq_match("amq.match"); +const std::string qpid_management("qpid.management"); +const std::string knownHostsNone("none"); + +// static +Broker* Broker::thisBroker; + Broker::Options::Options(const std::string& name) : qpid::Options(name), noDataDir(0), @@ -127,6 +140,7 @@ Broker::Options::Options(const std::string& name) : queueLimit(100*1048576/*100M default limit*/), tcpNoDelay(false), requireEncrypted(false), + knownHosts(knownHostsNone), qmf2Support(true), qmf1Support(true), queueFlowStopRatio(80), @@ -136,7 +150,7 @@ Broker::Options::Options(const std::string& name) : timestampRcvMsgs(false), // set the 0.10 timestamp delivery property linkMaintenanceInterval(2), linkHeartbeatInterval(120), - maxNegotiateTime(2000) // 2s + maxNegotiateTime(10000) // 10s { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -152,6 +166,7 @@ Broker::Options::Options(const std::string& name) : ("data-dir", optValue(dataDir,"DIR"), "Directory to contain persistent data generated by the broker") ("no-data-dir", optValue(noDataDir), "Don't use a data directory. No persistent configuration will be loaded or stored") ("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT") + ("interface", optValue(listenInterfaces, "<interface name>|<interface address>"), "Which network interfaces to use to listen for incoming connections") ("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size") ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") @@ -175,23 +190,27 @@ Broker::Options::Options(const std::string& name) : ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised") ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.") ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.") - ("link-maintenace-interval", optValue(linkMaintenanceInterval, "SECONDS")) - ("link-heartbeat-interval", optValue(linkHeartbeatInterval, "SECONDS")) - ("max-negotiate-time", optValue(maxNegotiateTime, "MilliSeconds"), "Maximum time a connection can take to send the initial protocol negotiation") + ("link-maintenance-interval", optValue(linkMaintenanceInterval, "SECONDS"), + "Interval to check link health and re-connect if need be") + ("link-heartbeat-interval", optValue(linkHeartbeatInterval, "SECONDS"), + "Heartbeat interval for a federation link") + ("max-negotiate-time", optValue(maxNegotiateTime, "MILLISECONDS"), "Maximum time a connection can take to send the initial protocol negotiation") ("federation-tag", optValue(fedTag, "NAME"), "Override the federation tag") ; } -const std::string empty; -const std::string amq_direct("amq.direct"); -const std::string amq_topic("amq.topic"); -const std::string amq_fanout("amq.fanout"); -const std::string amq_match("amq.match"); -const std::string qpid_management("qpid.management"); -const std::string knownHostsNone("none"); +namespace { +// Arguments to declare a non-replicated exchange. +framing::FieldTable noReplicateArgs() { + framing::FieldTable args; + args.setString("qpid.replicate", "none"); + return args; +} +} Broker::Broker(const Broker::Options& conf) : poller(new Poller), + timer(new qpid::sys::Timer), config(conf), managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support, conf.qmf2Support) @@ -205,21 +224,18 @@ Broker::Broker(const Broker::Options& conf) : exchanges(this), links(this), factory(new SecureConnectionFactory(*this)), - dtxManager(timer), + dtxManager(*timer.get()), sessionManager( qpid::SessionState::Configuration( conf.replayFlushLimit*1024, // convert kb to bytes. conf.replayHardLimit*1024), *this), - mgmtObject(0), - queueCleaner(queues, &timer), - recovery(true), - inCluster(false), - clusterUpdatee(false), + queueCleaner(queues, timer.get()), + recoveryInProgress(false), expiryPolicy(new ExpiryPolicy), - getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)), - deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2)) + getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { + thisBroker = this; try { if (conf.enableMgmt) { QPID_LOG(info, "Management enabled"); @@ -231,7 +247,7 @@ Broker::Broker(const Broker::Options& conf) : System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this); systemObject = System::shared_ptr(system); - mgmtObject = new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker"); + mgmtObject = _qmf::Broker::shared_ptr(new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker")); mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId()); mgmtObject->set_port(conf.port); mgmtObject->set_workerThreads(conf.workerThreads); @@ -281,28 +297,26 @@ Broker::Broker(const Broker::Options& conf) : // if (NullMessageStore::isNullStore(store.get())) // setStore(); - exchanges.declare(empty, DirectExchange::typeName); // Default exchange. + framing::FieldTable args; + + // Default exchnge is not replicated. + exchanges.declare(empty, DirectExchange::typeName, false, noReplicateArgs()); // if (store.get() != 0) { if (asyncStore.get() != 0) { - // The cluster plug-in will setRecovery(false) on all but the first - // broker to join a cluster. - if (getRecovery()) { QPID_LOG(info, "Store recovery starting") - RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager); + RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, protocolRegistry); RecoveryHandle rh = asyncStore->createRecoveryHandle(); - boost::shared_ptr<RecoveryAsyncContext> rac(new RecoveryAsyncContext(recoverer, &recoverComplete, &asyncResultQueue)); + boost::shared_ptr<RecoveryAsyncContext> rac(new RecoveryAsyncContext(recoverer, &recoverCompleteCb, &asyncResultQueue)); asyncStore->submitRecover(rh, rac); -// store->recover(recoverer); - } - else { - QPID_LOG(notice, "Cluster recovery: recovered journal data discarded and journal files pushed down"); -// store->truncateInit(true); // save old files in subdir - asyncStore->initialize(true, true); - } +// RecoveryManagerImpl recoverer( +// queues, exchanges, links, dtxManager, protocolRegistry); +// recoveryInProgress = true; +// store->recover(recoverer); +// recoveryInProgress = false; } // debug - else QPID_LOG(info, ">>>> No store!!!!") +// else QPID_LOG(info, ">>>> No store!!!!") //ensure standard exchanges exist (done after recovery from store) declareStandardExchange(amq_direct, DirectExchange::typeName); @@ -311,7 +325,7 @@ Broker::Broker(const Broker::Options& conf) : declareStandardExchange(amq_match, HeadersExchange::typeName); if(conf.enableMgmt) { - exchanges.declare(qpid_management, ManagementTopicExchange::typeName); + exchanges.declare(qpid_management, ManagementTopicExchange::typeName, false, noReplicateArgs()); Exchange::shared_ptr mExchange = exchanges.get(qpid_management); Exchange::shared_ptr dExchange = exchanges.get(amq_direct); managementAgent->setExchange(mExchange, dExchange); @@ -320,8 +334,10 @@ Broker::Broker(const Broker::Options& conf) : std::string qmfTopic("qmf.default.topic"); std::string qmfDirect("qmf.default.direct"); - std::pair<Exchange::shared_ptr, bool> topicPair(exchanges.declare(qmfTopic, ManagementTopicExchange::typeName)); - std::pair<Exchange::shared_ptr, bool> directPair(exchanges.declare(qmfDirect, ManagementDirectExchange::typeName)); + std::pair<Exchange::shared_ptr, bool> topicPair( + exchanges.declare(qmfTopic, ManagementTopicExchange::typeName, false, noReplicateArgs())); + std::pair<Exchange::shared_ptr, bool> directPair( + exchanges.declare(qmfDirect, ManagementDirectExchange::typeName, false, noReplicateArgs())); boost::dynamic_pointer_cast<ManagementDirectExchange>(directPair.first)->setManagmentAgent(managementAgent.get(), 2); boost::dynamic_pointer_cast<ManagementTopicExchange>(topicPair.first)->setManagmentAgent(managementAgent.get(), 2); @@ -349,19 +365,17 @@ Broker::Broker(const Broker::Options& conf) : // Initialize plugins Plugin::initializeAll(*this); - if (managementAgent.get()) managementAgent->pluginsInitialized(); + if(conf.enableMgmt) { + if (getAcl()) { + mgmtObject->set_maxConns(getAcl()->getMaxConnectTotal()); + } + } if (conf.queueCleanInterval) { queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC); } - //initialize known broker urls (TODO: add support for urls for other transports (SSL, RDMA)): - if (conf.knownHosts.empty()) { - boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(TCP_TRANSPORT); - if (factory) { - knownBrokers.push_back ( qpid::Url::getIpAddressesUrl ( factory->getPort() ) ); - } - } else if (conf.knownHosts != knownHostsNone) { + if (!conf.knownHosts.empty() && conf.knownHosts != knownHostsNone) { knownBrokers.push_back(Url(conf.knownHosts)); } @@ -375,11 +389,14 @@ void Broker::declareStandardExchange(const std::string& name, const std::string& { // bool storeEnabled = store.get() != NULL; bool storeEnabled = asyncStore.get() != NULL; - std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled); + framing::FieldTable args; + // Standard exchanges are not replicated. + std::pair<Exchange::shared_ptr, bool> status = + exchanges.declare(name, type, storeEnabled, noReplicateArgs()); if (status.second && storeEnabled) { // store->create(*status.first, framing::FieldTable ()); ConfigHandle ch = asyncStore->createConfigHandle(); - boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue)); + boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureCompleteCb, &asyncResultQueue)); asyncStore->submitCreate(ch, status.first.get(), bc); } } @@ -420,11 +437,20 @@ void Broker::setStore () { } // static +void Broker::recoverCompleteCb(const AsyncResultHandle* const arh) { + thisBroker->recoverComplete(arh); +} + void Broker::recoverComplete(const AsyncResultHandle* const arh) { + recoveryInProgress = false; std::cout << "@@@@ Broker: Recover complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; } // static +void Broker::configureCompleteCb(const AsyncResultHandle* const arh) { + thisBroker->configureComplete(arh); +} + void Broker::configureComplete(const AsyncResultHandle* const arh) { std::cout << "@@@@ Broker: Configure complete: err=" << arh->getErrNo() << "; msg=\"" << arh->getErrMsg() << "\"" << std::endl; } @@ -464,13 +490,13 @@ Broker::~Broker() { finalize(); // Finalize any plugins. if (config.auth) SaslAuthenticator::fini(); - timer.stop(); + timer->stop(); QPID_LOG(notice, "Shut down"); } -ManagementObject* Broker::GetManagementObject(void) const +ManagementObject::shared_ptr Broker::GetManagementObject(void) const { - return (ManagementObject*) mgmtObject; + return mgmtObject; } Manageable* Broker::GetVhostObject(void) const @@ -536,7 +562,7 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, _qmf::ArgsBrokerQueueMoveMessages& moveArgs= dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args); QPID_LOG (debug, "Broker::queueMoveMessages()"); - if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter)) + if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter) >= 0) status = Manageable::STATUS_OK; else return Manageable::STATUS_PARAMETER_INVALID; @@ -584,7 +610,22 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, status = setTimestampConfig(a.i_receive, getManagementExecutionContext()); break; } - default: + + case _qmf::Broker::METHOD_GETLOGHIRESTIMESTAMP: + { + dynamic_cast<_qmf::ArgsBrokerGetLogHiresTimestamp&>(args).o_logHires = getLogHiresTimestamp(); + QPID_LOG (debug, "Broker::getLogHiresTimestamp()"); + status = Manageable::STATUS_OK; + break; + } + case _qmf::Broker::METHOD_SETLOGHIRESTIMESTAMP: + { + setLogHiresTimestamp(dynamic_cast<_qmf::ArgsBrokerSetLogHiresTimestamp&>(args).i_logHires); + QPID_LOG (debug, "Broker::setLogHiresTimestamp()"); + status = Manageable::STATUS_OK; + break; + } + default: QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]"); status = Manageable::STATUS_NOT_IMPLEMENTED; break; @@ -750,7 +791,7 @@ void Broker::createObject(const std::string& type, const std::string& name, else extensions[i->first] = i->second; } framing::FieldTable arguments; - amqp_0_10::translate(extensions, arguments); + qpid::amqp_0_10::translate(extensions, arguments); try { std::pair<boost::shared_ptr<Exchange>, bool> result = @@ -772,7 +813,7 @@ void Broker::createObject(const std::string& type, const std::string& name, else extensions[i->first] = i->second; } framing::FieldTable arguments; - amqp_0_10::translate(extensions, arguments); + qpid::amqp_0_10::translate(extensions, arguments); bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId); @@ -1008,6 +1049,18 @@ std::string Broker::getLogLevel() return level; } +void Broker::setLogHiresTimestamp(bool enabled) +{ + QPID_LOG(notice, "Changing log hires timestamp to " << enabled); + qpid::log::Logger::instance().setHiresTimestamp(enabled); +} + +bool Broker::getLogHiresTimestamp() +{ + return qpid::log::Logger::instance().getHiresTimestamp(); +} + + boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory(const std::string& name) const { ProtocolFactoryMap::const_iterator i = name.empty() ? protocolFactories.begin() : protocolFactories.find(name); @@ -1036,39 +1089,29 @@ void Broker::accept() { } void Broker::connect( + const std::string& name, const std::string& host, const std::string& port, const std::string& transport, - boost::function2<void, int, std::string> failed, - sys::ConnectionCodec::Factory* f) + boost::function2<void, int, std::string> failed) { boost::shared_ptr<ProtocolFactory> pf = getProtocolFactory(transport); - if (pf) pf->connect(poller, host, port, f ? f : factory.get(), failed); + if (pf) pf->connect(poller, name, host, port, factory.get(), failed); else throw NoSuchTransportException(QPID_MSG("Unsupported transport type: " << transport)); } -void Broker::connect( - const Url& url, - boost::function2<void, int, std::string> failed, - sys::ConnectionCodec::Factory* f) -{ - url.throwIfEmpty(); - const Address& addr=url[0]; - connect(addr.host, boost::lexical_cast<std::string>(addr.port), addr.protocol, failed, f); -} - -uint32_t Broker::queueMoveMessages( +int32_t Broker::queueMoveMessages( const std::string& srcQueue, const std::string& destQueue, uint32_t qty, const Variant::Map& filter) { - Queue::shared_ptr src_queue = queues.find(srcQueue); - if (!src_queue) - return 0; - Queue::shared_ptr dest_queue = queues.find(destQueue); - if (!dest_queue) - return 0; - - return src_queue->move(dest_queue, qty, &filter); + Queue::shared_ptr src_queue = queues.find(srcQueue); + if (!src_queue) + return -1; + Queue::shared_ptr dest_queue = queues.find(destQueue); + if (!dest_queue) + return -1; + + return (int32_t) src_queue->move(dest_queue, qty, &filter); } @@ -1083,12 +1126,6 @@ Broker::getKnownBrokersImpl() bool Broker::deferDeliveryImpl(const std::string&, const Message&) { return false; } -void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) { - clusterTimer = t; - queueCleaner.setTimer(clusterTimer.get()); - dtxManager.setTimer(*clusterTimer.get()); -} - const std::string Broker::TCP_TRANSPORT("tcp"); @@ -1109,9 +1146,14 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( params.insert(make_pair(acl::PROP_POLICYTYPE, settings.dropMessagesAtLimit ? "ring" : "reject")); params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(settings.maxDepth.getCount()))); params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(settings.maxDepth.getSize()))); + params.insert(make_pair(acl::PROP_MAXFILECOUNT, boost::lexical_cast<string>(settings.maxFileCount))); + params.insert(make_pair(acl::PROP_MAXFILESIZE, boost::lexical_cast<string>(settings.maxFileSize))); if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) ) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId)); + + if (!acl->approveCreateQueue(userId,name) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId)); } Exchange::shared_ptr alternate; @@ -1120,21 +1162,12 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange)); } - std::pair<Queue::shared_ptr, bool> result = queues.declare(name, settings, alternate); + std::pair<Queue::shared_ptr, bool> result = + queues.declare(name, settings, alternate, false/*recovering*/, + owner, connectionId, userId); if (result.second) { //add default binding: result.first->bind(exchanges.getDefault(), name, qpid::framing::FieldTable()); - - if (managementAgent.get()) { - //TODO: debatable whether we should raise an event here for - //create when this is a 'declare' event; ideally add a create - //event instead? - managementAgent->raiseEvent( - _qmf::EventQueueDeclare(connectionId, userId, name, - settings.durable, owner, settings.autodelete, alternateExchange, - settings.asMap(), - "created")); - } QPID_LOG_CAT(debug, model, "Create queue. name:" << name << " user:" << userId << " rhost:" << connectionId @@ -1149,6 +1182,10 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( void Broker::deleteQueue(const std::string& name, const std::string& userId, const std::string& connectionId, QueueFunctor check) { + QPID_LOG_CAT(debug, model, "Deleting queue. name:" << name + << " user:" << userId + << " rhost:" << connectionId + ); if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) { throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId)); } @@ -1156,19 +1193,13 @@ void Broker::deleteQueue(const std::string& name, const std::string& userId, Queue::shared_ptr queue = queues.find(name); if (queue) { if (check) check(queue); - queues.destroy(name); + if (acl) + acl->recordDestroyQueue(name); + queues.destroy(name, connectionId, userId); queue->destroyed(); } else { throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name)); } - - if (managementAgent.get()) - managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name)); - QPID_LOG_CAT(debug, model, "Delete queue. name:" << name - << " user:" << userId - << " rhost:" << connectionId - ); - } std::pair<Exchange::shared_ptr, bool> Broker::createExchange( @@ -1196,33 +1227,16 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange( } std::pair<Exchange::shared_ptr, bool> result; - result = exchanges.declare(name, type, durable, arguments); + result = exchanges.declare( + name, type, durable, arguments, alternate, connectionId, userId); if (result.second) { - if (alternate) { - result.first->setAlternate(alternate); - alternate->incAlternateUsers(); - } if (durable) { // store->create(*result.first, arguments); ConfigHandle ch = asyncStore->createConfigHandle(); result.first->setHandle(ch); - boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue)); + boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureCompleteCb, &asyncResultQueue)); asyncStore->submitCreate(ch, result.first.get(), bc); } - if (managementAgent.get()) { - //TODO: debatable whether we should raise an event here for - //create when this is a 'declare' event; ideally add a create - //event instead? - managementAgent->raiseEvent(_qmf::EventExchangeDeclare(connectionId, - userId, - name, - type, - alternateExchange, - durable, - false, - ManagementAgent::toMap(arguments), - "created")); - } QPID_LOG_CAT(debug, model, "Create exchange. name:" << name << " user:" << userId << " rhost:" << connectionId @@ -1236,6 +1250,9 @@ std::pair<Exchange::shared_ptr, bool> Broker::createExchange( void Broker::deleteExchange(const std::string& name, const std::string& userId, const std::string& connectionId) { + QPID_LOG_CAT(debug, model, "Deleting exchange. name:" << name + << " user:" << userId + << " rhost:" << connectionId); if (acl) { if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) ) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId)); @@ -1246,21 +1263,15 @@ void Broker::deleteExchange(const std::string& name, const std::string& userId, } Exchange::shared_ptr exchange(exchanges.get(name)); if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name)); - if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); + if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Cannot delete " << name <<", in use as alternate-exchange.")); // if (exchange->isDurable()) store->destroy(*exchange); if (exchange->isDurable()) { - boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureComplete, &asyncResultQueue)); + boost::shared_ptr<BrokerAsyncContext> bc(new ConfigAsyncContext(&configureCompleteCb, &asyncResultQueue)); asyncStore->submitDestroy(exchange->getHandle(), bc); exchange->resetHandle(); } if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); - exchanges.destroy(name); - - if (managementAgent.get()) - managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name)); - QPID_LOG_CAT(debug, model, "Delete exchange. name:" << name - << " user:" << userId - << " rhost:" << connectionId); + exchanges.destroy(name, connectionId, userId); } void Broker::bind(const std::string& queueName, @@ -1298,6 +1309,7 @@ void Broker::bind(const std::string& queueName, QPID_LOG_CAT(debug, model, "Create binding. exchange:" << exchangeName << " queue:" << queueName << " key:" << key + << " arguments:" << arguments << " user:" << userId << " rhost:" << connectionId); } |