diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /cpp/src/qpid/broker/Broker.cpp | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 967 |
1 files changed, 0 insertions, 967 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp deleted file mode 100644 index 240766c443..0000000000 --- a/cpp/src/qpid/broker/Broker.cpp +++ /dev/null @@ -1,967 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/broker/Broker.h" -#include "qpid/broker/ConnectionState.h" -#include "qpid/broker/DirectExchange.h" -#include "qpid/broker/FanOutExchange.h" -#include "qpid/broker/HeadersExchange.h" -#include "qpid/broker/MessageStoreModule.h" -#include "qpid/broker/NullMessageStore.h" -#include "qpid/broker/RecoveryManagerImpl.h" -#include "qpid/broker/SaslAuthenticator.h" -#include "qpid/broker/SecureConnectionFactory.h" -#include "qpid/broker/TopicExchange.h" -#include "qpid/broker/Link.h" -#include "qpid/broker/ExpiryPolicy.h" -#include "qpid/broker/QueueFlowLimit.h" - -#include "qmf/org/apache/qpid/broker/Package.h" -#include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h" -#include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h" -#include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h" -#include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h" -#include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h" -#include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h" -#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" -#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" -#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" -#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" -#include "qmf/org/apache/qpid/broker/EventBind.h" -#include "qmf/org/apache/qpid/broker/EventUnbind.h" -#include "qpid/amqp_0_10/Codecs.h" -#include "qpid/management/ManagementDirectExchange.h" -#include "qpid/management/ManagementTopicExchange.h" -#include "qpid/log/Logger.h" -#include "qpid/log/Options.h" -#include "qpid/log/Statement.h" -#include "qpid/log/posix/SinkOptions.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/framing/ProtocolInitiation.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/Uuid.h" -#include "qpid/sys/ProtocolFactory.h" -#include "qpid/sys/Poller.h" -#include "qpid/sys/Dispatcher.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Time.h" -#include "qpid/sys/ConnectionInputHandler.h" -#include "qpid/sys/ConnectionInputHandlerFactory.h" -#include "qpid/sys/TimeoutHandler.h" -#include "qpid/sys/SystemInfo.h" -#include "qpid/Address.h" -#include "qpid/StringUtils.h" -#include "qpid/Url.h" -#include "qpid/Version.h" - -#include <boost/bind.hpp> -#include <boost/format.hpp> - -#include <iostream> -#include <memory> - -using qpid::sys::ProtocolFactory; -using qpid::sys::Poller; -using qpid::sys::Dispatcher; -using qpid::sys::Thread; -using qpid::framing::FrameHandler; -using qpid::framing::ChannelId; -using qpid::management::ManagementAgent; -using qpid::management::ManagementObject; -using qpid::management::Manageable; -using qpid::management::Args; -using qpid::management::getManagementExecutionContext; -using qpid::types::Variant; -using std::string; -using std::make_pair; - -namespace _qmf = qmf::org::apache::qpid::broker; - -namespace qpid { -namespace broker { - -Broker::Options::Options(const std::string& name) : - qpid::Options(name), - noDataDir(0), - port(DEFAULT_PORT), - workerThreads(5), - maxConnections(500), - connectionBacklog(10), - enableMgmt(1), - mgmtPubInterval(10), - queueCleanInterval(60*10),//10 minutes - auth(SaslAuthenticator::available()), - realm("QPID"), - replayFlushLimit(0), - replayHardLimit(0), - queueLimit(100*1048576/*100M default limit*/), - tcpNoDelay(false), - requireEncrypted(false), - maxSessionRate(0), - asyncQueueEvents(false), // Must be false in a cluster. - qmf2Support(true), - qmf1Support(true), - queueFlowStopRatio(80), - queueFlowResumeRatio(70), - queueThresholdEventRatio(80) -{ - int c = sys::SystemInfo::concurrency(); - workerThreads=c+1; - std::string home = getHome(); - - if (home.length() == 0) - dataDir += DEFAULT_DATA_DIR_LOCATION; - else - dataDir += home; - dataDir += DEFAULT_DATA_DIR_NAME; - - addOptions() - ("data-dir", optValue(dataDir,"DIR"), "Directory to contain persistent data generated by the broker") - ("no-data-dir", optValue(noDataDir), "Don't use a data directory. No persistent configuration will be loaded or stored") - ("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT") - ("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size") - ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections") - ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") - ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") - ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Enable broadcast of management information over QMF v2") - ("mgmt-qmf1", optValue(qmf1Support,"yes|no"), "Enable broadcast of management information over QMF v1") - ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") - ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"), - "Interval between attempts to purge any expired messages from queues") - ("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming connections will be trusted") - ("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication") - ("default-queue-limit", optValue(queueLimit, "BYTES"), "Default maximum size for queues (in bytes)") - ("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections") - ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted") - ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)") - ("sasl-config", optValue(saslConfigPath, "DIR"), "gets sasl config info from nonstandard location") - ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)") - ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication") - ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.") - ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.") - ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised"); -} - -const std::string empty; -const std::string amq_direct("amq.direct"); -const std::string amq_topic("amq.topic"); -const std::string amq_fanout("amq.fanout"); -const std::string amq_match("amq.match"); -const std::string qpid_management("qpid.management"); -const std::string knownHostsNone("none"); - -Broker::Broker(const Broker::Options& conf) : - poller(new Poller), - config(conf), - managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support, - conf.qmf2Support) - : 0), - store(new NullMessageStore), - acl(0), - dataDir(conf.noDataDir ? std::string() : conf.dataDir), - queues(this), - exchanges(this), - links(this), - factory(new SecureConnectionFactory(*this)), - dtxManager(timer), - sessionManager( - qpid::SessionState::Configuration( - conf.replayFlushLimit*1024, // convert kb to bytes. - conf.replayHardLimit*1024), - *this), - queueCleaner(queues, timer), - queueEvents(poller,!conf.asyncQueueEvents), - recovery(true), - inCluster(false), - clusterUpdatee(false), - expiryPolicy(new ExpiryPolicy), - connectionCounter(conf.maxConnections), - getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)), - deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2)) -{ - try { - if (conf.enableMgmt) { - QPID_LOG(info, "Management enabled"); - managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(), - conf.mgmtPubInterval, this, conf.workerThreads + 3); - managementAgent->setName("apache.org", "qpidd"); - _qmf::Package packageInitializer(managementAgent.get()); - - System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this); - systemObject = System::shared_ptr(system); - - mgmtObject = new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker"); - mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId()); - mgmtObject->set_port(conf.port); - mgmtObject->set_workerThreads(conf.workerThreads); - mgmtObject->set_maxConns(conf.maxConnections); - mgmtObject->set_connBacklog(conf.connectionBacklog); - mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval); - mgmtObject->set_version(qpid::version); - if (dataDir.isEnabled()) - mgmtObject->set_dataDir(dataDir.getPath()); - else - mgmtObject->clr_dataDir(); - - managementAgent->addObject(mgmtObject, 0, true); - - // Since there is currently no support for virtual hosts, a placeholder object - // representing the implied single virtual host is added here to keep the - // management schema correct. - Vhost* vhost = new Vhost(this, this); - vhostObject = Vhost::shared_ptr(vhost); - framing::Uuid uuid(managementAgent->getUuid()); - federationTag = uuid.str(); - vhostObject->setFederationTag(federationTag); - - queues.setParent(vhost); - exchanges.setParent(vhost); - links.setParent(vhost); - } else { - // Management is disabled so there is no broker management ID. - // Create a unique uuid to use as the federation tag. - framing::Uuid uuid(true); - federationTag = uuid.str(); - } - - QueuePolicy::setDefaultMaxSize(conf.queueLimit); - - // Early-Initialize plugins - Plugin::earlyInitAll(*this); - - QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio); - - // If no plugin store module registered itself, set up the null store. - if (NullMessageStore::isNullStore(store.get())) - setStore(); - - exchanges.declare(empty, DirectExchange::typeName); // Default exchange. - - if (store.get() != 0) { - // The cluster plug-in will setRecovery(false) on all but the first - // broker to join a cluster. - if (getRecovery()) { - RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager); - store->recover(recoverer); - } - else { - QPID_LOG(notice, "Cluster recovery: recovered journal data discarded and journal files pushed down"); - store->truncateInit(true); // save old files in subdir - } - } - - //ensure standard exchanges exist (done after recovery from store) - declareStandardExchange(amq_direct, DirectExchange::typeName); - declareStandardExchange(amq_topic, TopicExchange::typeName); - declareStandardExchange(amq_fanout, FanOutExchange::typeName); - declareStandardExchange(amq_match, HeadersExchange::typeName); - - if(conf.enableMgmt) { - exchanges.declare(qpid_management, ManagementTopicExchange::typeName); - Exchange::shared_ptr mExchange = exchanges.get(qpid_management); - Exchange::shared_ptr dExchange = exchanges.get(amq_direct); - managementAgent->setExchange(mExchange, dExchange); - boost::dynamic_pointer_cast<ManagementTopicExchange>(mExchange)->setManagmentAgent(managementAgent.get(), 1); - - std::string qmfTopic("qmf.default.topic"); - std::string qmfDirect("qmf.default.direct"); - - std::pair<Exchange::shared_ptr, bool> topicPair(exchanges.declare(qmfTopic, ManagementTopicExchange::typeName)); - std::pair<Exchange::shared_ptr, bool> directPair(exchanges.declare(qmfDirect, ManagementDirectExchange::typeName)); - - boost::dynamic_pointer_cast<ManagementDirectExchange>(directPair.first)->setManagmentAgent(managementAgent.get(), 2); - boost::dynamic_pointer_cast<ManagementTopicExchange>(topicPair.first)->setManagmentAgent(managementAgent.get(), 2); - - managementAgent->setExchangeV2(topicPair.first, directPair.first); - } - else - QPID_LOG(info, "Management not enabled"); - - /** - * SASL setup, can fail and terminate startup - */ - if (conf.auth) { - SaslAuthenticator::init(qpid::saslName, conf.saslConfigPath); - QPID_LOG(info, "SASL enabled"); - } else { - QPID_LOG(notice, "SASL disabled: No Authentication Performed"); - } - - // Initialize plugins - Plugin::initializeAll(*this); - - if (managementAgent.get()) managementAgent->pluginsInitialized(); - - if (conf.queueCleanInterval) { - queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC); - } - - //initialize known broker urls (TODO: add support for urls for other transports (SSL, RDMA)): - if (conf.knownHosts.empty()) { - boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(TCP_TRANSPORT); - if (factory) { - knownBrokers.push_back ( qpid::Url::getIpAddressesUrl ( factory->getPort() ) ); - } - } else if (conf.knownHosts != knownHostsNone) { - knownBrokers.push_back(Url(conf.knownHosts)); - } - } catch (const std::exception& /*e*/) { - finalize(); - throw; - } -} - -void Broker::declareStandardExchange(const std::string& name, const std::string& type) -{ - bool storeEnabled = store.get() != NULL; - std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled); - if (status.second && storeEnabled) { - store->create(*status.first, framing::FieldTable ()); - } -} - - -boost::intrusive_ptr<Broker> Broker::create(int16_t port) -{ - Options config; - config.port=port; - return create(config); -} - -boost::intrusive_ptr<Broker> Broker::create(const Options& opts) -{ - return boost::intrusive_ptr<Broker>(new Broker(opts)); -} - -void Broker::setStore (boost::shared_ptr<MessageStore>& _store) -{ - store.reset(new MessageStoreModule (_store)); - setStore(); -} - -void Broker::setStore () { - queues.setStore (store.get()); - dtxManager.setStore (store.get()); - links.setStore (store.get()); -} - -void Broker::run() { - if (config.workerThreads > 0) { - QPID_LOG(notice, "Broker running"); - Dispatcher d(poller); - int numIOThreads = config.workerThreads; - std::vector<Thread> t(numIOThreads-1); - - // Run n-1 io threads - for (int i=0; i<numIOThreads-1; ++i) - t[i] = Thread(d); - - // Run final thread - d.run(); - - // Now wait for n-1 io threads to exit - for (int i=0; i<numIOThreads-1; ++i) { - t[i].join(); - } - } else { - throw Exception((boost::format("Invalid value for worker-threads: %1%") % config.workerThreads).str()); - } -} - -void Broker::shutdown() { - // NB: this function must be async-signal safe, it must not - // call any function that is not async-signal safe. - // Any unsafe shutdown actions should be done in the destructor. - poller->shutdown(); -} - -Broker::~Broker() { - shutdown(); - queueEvents.shutdown(); - finalize(); // Finalize any plugins. - if (config.auth) - SaslAuthenticator::fini(); - timer.stop(); - QPID_LOG(notice, "Shut down"); -} - -ManagementObject* Broker::GetManagementObject(void) const -{ - return (ManagementObject*) mgmtObject; -} - -Manageable* Broker::GetVhostObject(void) const -{ - return vhostObject.get(); -} - -Manageable::status_t Broker::ManagementMethod (uint32_t methodId, - Args& args, - string&) -{ - Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; - - switch (methodId) - { - case _qmf::Broker::METHOD_ECHO : - QPID_LOG (debug, "Broker::echo(" - << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence - << ", " - << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body - << ")"); - status = Manageable::STATUS_OK; - break; - case _qmf::Broker::METHOD_CONNECT : { - _qmf::ArgsBrokerConnect& hp= - dynamic_cast<_qmf::ArgsBrokerConnect&>(args); - - QPID_LOG (debug, "Broker::connect()"); - string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport; - if (!getProtocolFactory(transport)) { - QPID_LOG(error, "Transport '" << transport << "' not supported"); - return Manageable::STATUS_NOT_IMPLEMENTED; - } - std::pair<Link::shared_ptr, bool> response = - links.declare (hp.i_host, hp.i_port, transport, hp.i_durable, - hp.i_authMechanism, hp.i_username, hp.i_password); - if (hp.i_durable && response.second) - store->create(*response.first); - status = Manageable::STATUS_OK; - break; - } - case _qmf::Broker::METHOD_QUEUEMOVEMESSAGES : { - _qmf::ArgsBrokerQueueMoveMessages& moveArgs= - dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args); - QPID_LOG (debug, "Broker::queueMoveMessages()"); - if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty)) - status = Manageable::STATUS_OK; - else - return Manageable::STATUS_PARAMETER_INVALID; - break; - } - case _qmf::Broker::METHOD_SETLOGLEVEL : - setLogLevel(dynamic_cast<_qmf::ArgsBrokerSetLogLevel&>(args).i_level); - QPID_LOG (debug, "Broker::setLogLevel()"); - status = Manageable::STATUS_OK; - break; - case _qmf::Broker::METHOD_GETLOGLEVEL : - dynamic_cast<_qmf::ArgsBrokerGetLogLevel&>(args).o_level = getLogLevel(); - QPID_LOG (debug, "Broker::getLogLevel()"); - status = Manageable::STATUS_OK; - break; - case _qmf::Broker::METHOD_CREATE : - { - _qmf::ArgsBrokerCreate& a = dynamic_cast<_qmf::ArgsBrokerCreate&>(args); - createObject(a.i_type, a.i_name, a.i_properties, a.i_strict, getManagementExecutionContext()); - status = Manageable::STATUS_OK; - break; - } - case _qmf::Broker::METHOD_DELETE : - { - _qmf::ArgsBrokerDelete& a = dynamic_cast<_qmf::ArgsBrokerDelete&>(args); - deleteObject(a.i_type, a.i_name, a.i_options, getManagementExecutionContext()); - status = Manageable::STATUS_OK; - break; - } - default: - QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]"); - status = Manageable::STATUS_NOT_IMPLEMENTED; - break; - } - - return status; -} - -namespace -{ -const std::string TYPE_QUEUE("queue"); -const std::string TYPE_EXCHANGE("exchange"); -const std::string TYPE_TOPIC("topic"); -const std::string TYPE_BINDING("binding"); -const std::string DURABLE("durable"); -const std::string AUTO_DELETE("auto-delete"); -const std::string ALTERNATE_EXCHANGE("alternate-exchange"); -const std::string EXCHANGE_TYPE("exchange-type"); -const std::string QUEUE_NAME("queue"); -const std::string EXCHANGE_NAME("exchange"); - -const std::string _TRUE("true"); -const std::string _FALSE("false"); -} - -struct InvalidBindingIdentifier : public qpid::Exception -{ - InvalidBindingIdentifier(const std::string& name) : qpid::Exception(name) {} - std::string getPrefix() const { return "invalid binding"; } -}; - -struct BindingIdentifier -{ - std::string exchange; - std::string queue; - std::string key; - - BindingIdentifier(const std::string& name) - { - std::vector<std::string> path; - split(path, name, "/"); - switch (path.size()) { - case 1: - queue = path[0]; - break; - case 2: - exchange = path[0]; - queue = path[1]; - break; - case 3: - exchange = path[0]; - queue = path[1]; - key = path[2]; - break; - default: - throw InvalidBindingIdentifier(name); - } - } -}; - -struct ObjectAlreadyExists : public qpid::Exception -{ - ObjectAlreadyExists(const std::string& name) : qpid::Exception(name) {} - std::string getPrefix() const { return "object already exists"; } -}; - -struct UnknownObjectType : public qpid::Exception -{ - UnknownObjectType(const std::string& type) : qpid::Exception(type) {} - std::string getPrefix() const { return "unknown object type"; } -}; - -void Broker::createObject(const std::string& type, const std::string& name, - const Variant::Map& properties, bool /*strict*/, const ConnectionState* context) -{ - std::string userId; - std::string connectionId; - if (context) { - userId = context->getUserId(); - connectionId = context->getUrl(); - } - //TODO: implement 'strict' option (check there are no unrecognised properties) - QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ")"); - if (type == TYPE_QUEUE) { - bool durable(false); - bool autodelete(false); - std::string alternateExchange; - Variant::Map extensions; - for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { - // extract durable, auto-delete and alternate-exchange properties - if (i->first == DURABLE) durable = i->second; - else if (i->first == AUTO_DELETE) autodelete = i->second; - else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString(); - //treat everything else as extension properties - else extensions[i->first] = i->second; - } - framing::FieldTable arguments; - amqp_0_10::translate(extensions, arguments); - - std::pair<boost::shared_ptr<Queue>, bool> result = - createQueue(name, durable, autodelete, 0, alternateExchange, arguments, userId, connectionId); - if (!result.second) { - throw ObjectAlreadyExists(name); - } - } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) { - bool durable(false); - std::string exchangeType("topic"); - std::string alternateExchange; - Variant::Map extensions; - for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { - // extract durable, auto-delete and alternate-exchange properties - if (i->first == DURABLE) durable = i->second; - else if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString(); - else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString(); - //treat everything else as extension properties - else extensions[i->first] = i->second; - } - framing::FieldTable arguments; - amqp_0_10::translate(extensions, arguments); - - try { - std::pair<boost::shared_ptr<Exchange>, bool> result = - createExchange(name, exchangeType, durable, alternateExchange, arguments, userId, connectionId); - if (!result.second) { - throw ObjectAlreadyExists(name); - } - } catch (const UnknownExchangeTypeException&) { - throw Exception(QPID_MSG("Invalid exchange type: " << exchangeType)); - } - } else if (type == TYPE_BINDING) { - BindingIdentifier binding(name); - std::string exchangeType("topic"); - Variant::Map extensions; - for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { - // extract durable, auto-delete and alternate-exchange properties - if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString(); - //treat everything else as extension properties - else extensions[i->first] = i->second; - } - framing::FieldTable arguments; - amqp_0_10::translate(extensions, arguments); - - bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId); - } else { - throw UnknownObjectType(type); - } -} - -void Broker::deleteObject(const std::string& type, const std::string& name, - const Variant::Map& options, const ConnectionState* context) -{ - std::string userId; - std::string connectionId; - if (context) { - userId = context->getUserId(); - connectionId = context->getUrl(); - } - QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ")"); - if (type == TYPE_QUEUE) { - deleteQueue(name, userId, connectionId); - } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) { - deleteExchange(name, userId, connectionId); - } else if (type == TYPE_BINDING) { - BindingIdentifier binding(name); - unbind(binding.queue, binding.exchange, binding.key, userId, connectionId); - } else { - throw UnknownObjectType(type); - } - -} - -void Broker::setLogLevel(const std::string& level) -{ - QPID_LOG(notice, "Changing log level to " << level); - std::vector<std::string> selectors; - split(selectors, level, ", "); - qpid::log::Logger::instance().reconfigure(selectors); -} - -std::string Broker::getLogLevel() -{ - std::string level; - const std::vector<std::string>& selectors = qpid::log::Logger::instance().getOptions().selectors; - for (std::vector<std::string>::const_iterator i = selectors.begin(); i != selectors.end(); ++i) { - if (i != selectors.begin()) level += std::string(","); - level += *i; - } - return level; -} - -boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory(const std::string& name) const { - ProtocolFactoryMap::const_iterator i - = name.empty() ? protocolFactories.begin() : protocolFactories.find(name); - if (i == protocolFactories.end()) return boost::shared_ptr<ProtocolFactory>(); - else return i->second; -} - -uint16_t Broker::getPort(const std::string& name) const { - boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(name); - if (factory) { - return factory->getPort(); - } else { - throw NoSuchTransportException(QPID_MSG("No such transport: '" << name << "'")); - } -} - -void Broker::registerProtocolFactory(const std::string& name, ProtocolFactory::shared_ptr protocolFactory) { - protocolFactories[name] = protocolFactory; - Url::addProtocol(name); -} - -void Broker::accept() { - for (ProtocolFactoryMap::const_iterator i = protocolFactories.begin(); i != protocolFactories.end(); i++) { - i->second->accept(poller, factory.get()); - } -} - -void Broker::connect( - const std::string& host, uint16_t port, const std::string& transport, - boost::function2<void, int, std::string> failed, - sys::ConnectionCodec::Factory* f) -{ - boost::shared_ptr<ProtocolFactory> pf = getProtocolFactory(transport); - if (pf) pf->connect(poller, host, port, f ? f : factory.get(), failed); - else throw NoSuchTransportException(QPID_MSG("Unsupported transport type: " << transport)); -} - -void Broker::connect( - const Url& url, - boost::function2<void, int, std::string> failed, - sys::ConnectionCodec::Factory* f) -{ - url.throwIfEmpty(); - const Address& addr=url[0]; - connect(addr.host, addr.port, addr.protocol, failed, f); -} - -uint32_t Broker::queueMoveMessages( - const std::string& srcQueue, - const std::string& destQueue, - uint32_t qty) -{ - Queue::shared_ptr src_queue = queues.find(srcQueue); - if (!src_queue) - return 0; - Queue::shared_ptr dest_queue = queues.find(destQueue); - if (!dest_queue) - return 0; - - return src_queue->move(dest_queue, qty); -} - - -boost::shared_ptr<sys::Poller> Broker::getPoller() { return poller; } - -std::vector<Url> -Broker::getKnownBrokersImpl() -{ - return knownBrokers; -} - -bool Broker::deferDeliveryImpl(const std::string& , - const boost::intrusive_ptr<Message>& ) -{ return false; } - -void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) { - clusterTimer = t; -} - -const std::string Broker::TCP_TRANSPORT("tcp"); - - -std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( - const std::string& name, - bool durable, - bool autodelete, - const OwnershipToken* owner, - const std::string& alternateExchange, - const qpid::framing::FieldTable& arguments, - const std::string& userId, - const std::string& connectionId) -{ - if (acl) { - std::map<acl::Property, std::string> params; - params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); - params.insert(make_pair(acl::PROP_PASSIVE, _FALSE)); - params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE)); - params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? _TRUE : _FALSE)); - params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? _TRUE : _FALSE)); - params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type"))); - params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count")))); - params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size")))); - - if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) ) - throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId)); - } - - Exchange::shared_ptr alternate; - if (!alternateExchange.empty()) { - alternate = exchanges.get(alternateExchange); - if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange)); - } - - std::pair<Queue::shared_ptr, bool> result = queues.declare(name, durable, autodelete, owner, alternate, arguments); - if (result.second) { - //add default binding: - result.first->bind(exchanges.getDefault(), name); - - if (managementAgent.get()) { - //TODO: debatable whether we should raise an event here for - //create when this is a 'declare' event; ideally add a create - //event instead? - managementAgent->raiseEvent( - _qmf::EventQueueDeclare(connectionId, userId, name, - durable, owner, autodelete, - ManagementAgent::toMap(arguments), - "created")); - } - } - return result; -} - -void Broker::deleteQueue(const std::string& name, const std::string& userId, - const std::string& connectionId, QueueFunctor check) -{ - if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) { - throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId)); - } - - Queue::shared_ptr queue = queues.find(name); - if (queue) { - if (check) check(queue); - queues.destroy(name); - queue->destroyed(); - } else { - throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name)); - } - - if (managementAgent.get()) - managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name)); - -} - -std::pair<Exchange::shared_ptr, bool> Broker::createExchange( - const std::string& name, - const std::string& type, - bool durable, - const std::string& alternateExchange, - const qpid::framing::FieldTable& arguments, - const std::string& userId, - const std::string& connectionId) -{ - if (acl) { - std::map<acl::Property, std::string> params; - params.insert(make_pair(acl::PROP_TYPE, type)); - params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); - params.insert(make_pair(acl::PROP_PASSIVE, _FALSE)); - params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE)); - if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_EXCHANGE,name,¶ms) ) - throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << userId)); - } - - Exchange::shared_ptr alternate; - if (!alternateExchange.empty()) { - alternate = exchanges.get(alternateExchange); - if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange)); - } - - std::pair<Exchange::shared_ptr, bool> result; - result = exchanges.declare(name, type, durable, arguments); - if (result.second) { - if (alternate) { - result.first->setAlternate(alternate); - alternate->incAlternateUsers(); - } - if (durable) { - store->create(*result.first, arguments); - } - if (managementAgent.get()) { - //TODO: debatable whether we should raise an event here for - //create when this is a 'declare' event; ideally add a create - //event instead? - managementAgent->raiseEvent(_qmf::EventExchangeDeclare(connectionId, - userId, - name, - type, - alternateExchange, - durable, - false, - ManagementAgent::toMap(arguments), - "created")); - } - } - return result; -} - -void Broker::deleteExchange(const std::string& name, const std::string& userId, - const std::string& connectionId) -{ - if (acl) { - if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) ) - throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId)); - } - - Exchange::shared_ptr exchange(exchanges.get(name)); - if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name)); - if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); - if (exchange->isDurable()) store->destroy(*exchange); - if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); - exchanges.destroy(name); - - if (managementAgent.get()) - managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name)); - -} - -void Broker::bind(const std::string& queueName, - const std::string& exchangeName, - const std::string& key, - const qpid::framing::FieldTable& arguments, - const std::string& userId, - const std::string& connectionId) -{ - if (acl) { - std::map<acl::Property, std::string> params; - params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); - params.insert(make_pair(acl::PROP_ROUTINGKEY, key)); - - if (!acl->authorise(userId,acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,¶ms)) - throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << userId)); - } - - Queue::shared_ptr queue = queues.find(queueName); - Exchange::shared_ptr exchange = exchanges.get(exchangeName); - if (!queue) { - throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName)); - } else if (!exchange) { - throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName)); - } else { - if (queue->bind(exchange, key, arguments)) { - if (managementAgent.get()) { - managementAgent->raiseEvent(_qmf::EventBind(connectionId, userId, exchangeName, - queueName, key, ManagementAgent::toMap(arguments))); - } - } - } -} - -void Broker::unbind(const std::string& queueName, - const std::string& exchangeName, - const std::string& key, - const std::string& userId, - const std::string& connectionId) -{ - if (acl) { - std::map<acl::Property, std::string> params; - params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); - params.insert(make_pair(acl::PROP_ROUTINGKEY, key)); - if (!acl->authorise(userId,acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,¶ms) ) - throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << userId)); - } - - Queue::shared_ptr queue = queues.find(queueName); - Exchange::shared_ptr exchange = exchanges.get(exchangeName); - if (!queue) { - throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName)); - } else if (!exchange) { - throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName)); - } else { - if (exchange->unbind(queue, key, 0)) { - if (exchange->isDurable() && queue->isDurable()) { - store->unbind(*exchange, *queue, key, qpid::framing::FieldTable()); - } - if (managementAgent.get()) { - managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key)); - } - } - } -} - -}} // namespace qpid::broker - |