/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/broker/Broker.h" #include "qpid/broker/AclModule.h" #include "qpid/broker/BrokerOptions.h" #include "qpid/broker/Connection.h" #include "qpid/broker/DirectExchange.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" #include "qpid/broker/MessageStoreModule.h" #include "qpid/broker/NameGenerator.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/RecoveryManagerImpl.h" #include "qpid/broker/SaslAuthenticator.h" #include "qpid/broker/SecureConnectionFactory.h" #include "qpid/broker/TopicExchange.h" #include "qpid/broker/Link.h" #include "qpid/broker/PersistableObject.h" #include "qpid/broker/QueueFlowLimit.h" #include "qpid/broker/QueueSettings.h" #include "qpid/broker/TransactionObserver.h" #include "qpid/broker/MessageGroupManager.h" #include "qmf/org/apache/qpid/broker/Package.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerConnect.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerQuery.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogHiresTimestamp.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogHiresTimestamp.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueRedirect.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" #include "qmf/org/apache/qpid/broker/EventBind.h" #include "qmf/org/apache/qpid/broker/EventUnbind.h" #include "qmf/org/apache/qpid/broker/EventQueueRedirect.h" #include "qmf/org/apache/qpid/broker/EventQueueRedirectCancelled.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/management/ManagementDirectExchange.h" #include "qpid/management/ManagementTopicExchange.h" #include "qpid/log/Logger.h" #include "qpid/log/Options.h" #include "qpid/log/Statement.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/TransportFactory.h" #include "qpid/sys/Poller.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Time.h" #include "qpid/sys/Timer.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/sys/SystemInfo.h" #include "qpid/Address.h" #include "qpid/StringUtils.h" #include "qpid/Url.h" #include "qpid/Version.h" #include #include #include #include using qpid::sys::TransportAcceptor; using qpid::sys::TransportConnector; using qpid::sys::Poller; using qpid::sys::Dispatcher; using qpid::sys::Thread; using qpid::framing::FrameHandler; using qpid::framing::ChannelId; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; using qpid::management::getCurrentPublisher; using qpid::types::Variant; using std::string; using std::make_pair; namespace _qmf = qmf::org::apache::qpid::broker; namespace qpid { namespace broker { const std::string empty; const std::string amq_direct("amq.direct"); const std::string amq_topic("amq.topic"); const std::string amq_fanout("amq.fanout"); const std::string amq_match("amq.match"); const std::string qpid_management("qpid.management"); const std::string knownHostsNone("none"); BrokerOptions::BrokerOptions(const std::string& name) : qpid::Options(name), noDataDir(0), port(DEFAULT_PORT), workerThreads(5), connectionBacklog(10), enableMgmt(1), mgmtPublish(1), mgmtPubInterval(10*sys::TIME_SEC), queueCleanInterval(60*sys::TIME_SEC*10),//10 minutes auth(SaslAuthenticator::available()), realm("QPID"), replayFlushLimit(0), replayHardLimit(0), queueLimit(100*1048576/*100M default limit*/), tcpNoDelay(true), requireEncrypted(false), knownHosts(knownHostsNone), qmf2Support(true), qmf1Support(false), queueFlowStopRatio(80), queueFlowResumeRatio(70), queueThresholdEventRatio(80), defaultMsgGroup("qpid.no-group"), timestampRcvMsgs(false), // set the 0.10 timestamp delivery property linkMaintenanceInterval(2*sys::TIME_SEC), linkHeartbeatInterval(120*sys::TIME_SEC), dtxDefaultTimeout(60), // 60s dtxMaxTimeout(3600), // 3600s maxNegotiateTime(10000) // 10s { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; std::string home = getHome(); if (home.length() == 0) dataDir += DEFAULT_DATA_DIR_LOCATION; else dataDir += home; dataDir += DEFAULT_DATA_DIR_NAME; addOptions() ("data-dir", optValue(dataDir,"DIR"), "Directory to contain persistent data generated by the broker") ("no-data-dir", optValue(noDataDir), "Don't use a data directory. No persistent configuration will be loaded or stored") ("paging-dir", optValue(pagingDir,"DIR"), "Directory in which paging files will be created for paged queues") ("port,p", optValue(port,"PORT"), "Tells the broker to listen on PORT") ("interface", optValue(listenInterfaces, "|"), "Which network interfaces to use to listen for incoming connections") ("listen-disable", optValue(listenDisabled, ""), "Transports to disable listening") ("worker-threads", optValue(workerThreads, "N"), "Sets the broker thread pool size") ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") ("mgmt-publish", optValue(mgmtPublish,"yes|no"), "Enable Publish of Management Data ('no' implies query-only)") ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Enable broadcast of management information over QMF v2") ("mgmt-qmf1", optValue(qmf1Support,"yes|no"), "Enable broadcast of management information over QMF v1") ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"), "Interval between attempts to purge any expired messages from queues") ("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming connections will be trusted") ("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication") ("default-queue-limit", optValue(queueLimit, "BYTES"), "Default maximum size for queues (in bytes)") ("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections") ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted") ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)") ("sasl-config", optValue(saslConfigPath, "DIR"), "Allows SASL config path, if supported by platform, to be overridden. For default location on Linux, see Cyrus SASL documentation. There is no SASL config dir on Windows.") ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.") ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.") ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised") ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.") ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message.") ("link-maintenance-interval", optValue(linkMaintenanceInterval, "SECONDS"), "Interval to check federation link health and re-connect if need be") ("link-heartbeat-interval", optValue(linkHeartbeatInterval, "SECONDS"), "Heartbeat interval for a federation link") ("dtx-default-timeout", optValue(dtxDefaultTimeout, "SECONDS"), "Default timeout for DTX transaction before aborting it") ("dtx-max-timeout", optValue(dtxMaxTimeout, "SECONDS"), "Maximum allowed timeout for DTX transaction. A value of zero disables maximum timeout limit checks and allows arbitrarily large timeout settings.") ("max-negotiate-time", optValue(maxNegotiateTime, "MILLISECONDS"), "Maximum time a connection can take to send the initial protocol negotiation") ("federation-tag", optValue(fedTag, "NAME"), "Override the federation tag") ; } namespace { // Arguments to declare a non-replicated exchange. framing::FieldTable noReplicateArgs() { framing::FieldTable args; args.setString("qpid.replicate", "none"); return args; } } Broker::LogPrefix::LogPrefix() : std::string(Msg() << "Broker (pid=" << sys::SystemInfo::getProcessId() << ") ") { QPID_LOG(notice, *this << "start-up"); } Broker::LogPrefix::~LogPrefix() { QPID_LOG(notice, *this << "shut-down"); } Broker::Broker(const BrokerOptions& conf) : poller(new Poller), timer(new qpid::sys::Timer), config(conf), managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support, conf.qmf2Support) : 0), disabledListeningTransports(conf.listenDisabled.begin(), conf.listenDisabled.end()), store(new NullMessageStore), acl(0), dataDir(conf.noDataDir ? std::string() : conf.dataDir), pagingDir(!conf.pagingDir.empty() ? conf.pagingDir : dataDir.isEnabled() ? dataDir.getPath() + BrokerOptions::DEFAULT_PAGED_QUEUE_DIR : std::string() ), queues(this), exchanges(this), links(this), factory(new SecureConnectionFactory(*this)), dtxManager(*timer.get(), conf.dtxDefaultTimeout), sessionManager( qpid::SessionState::Configuration( conf.replayFlushLimit*1024, // convert kb to bytes. conf.replayHardLimit*1024), *this), queueCleaner(queues, poller, timer.get()), recoveryInProgress(false), timestampRcvMsgs(conf.timestampRcvMsgs), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { if (!dataDir.isEnabled()) { QPID_LOG (info, "No data directory - Disabling persistent configuration"); } try { if (conf.enableMgmt) { QPID_LOG(info, "Management enabled"); managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(), conf.mgmtPublish, conf.mgmtPubInterval/sys::TIME_SEC, this, conf.workerThreads + 3); managementAgent->setName("apache.org", "qpidd"); _qmf::Package packageInitializer(managementAgent.get()); System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : string(), this); systemObject = System::shared_ptr(system); mgmtObject = _qmf::Broker::shared_ptr(new _qmf::Broker(managementAgent.get(), this, system, "amqp-broker")); mgmtObject->set_systemRef(system->GetManagementObject()->getObjectId()); mgmtObject->set_port(conf.port); mgmtObject->set_workerThreads(conf.workerThreads); mgmtObject->set_connBacklog(conf.connectionBacklog); mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval/sys::TIME_SEC); mgmtObject->set_mgmtPublish(conf.mgmtPublish); mgmtObject->set_version(qpid::version); if (dataDir.isEnabled()) mgmtObject->set_dataDir(dataDir.getPath()); else mgmtObject->clr_dataDir(); managementAgent->addObject(mgmtObject, 0, true); // Since there is currently no support for virtual hosts, a placeholder object // representing the implied single virtual host is added here to keep the // management schema correct. Vhost* vhost = new Vhost(this, this); vhostObject = Vhost::shared_ptr(vhost); if (conf.fedTag.empty()) { framing::Uuid uuid(managementAgent->getUuid()); federationTag = uuid.str(); } else federationTag = conf.fedTag; vhostObject->setFederationTag(federationTag); queues.setParent(vhost); exchanges.setParent(vhost); links.setParent(vhost); } else { // Management is disabled so there is no broker management ID. // Create a unique uuid to use as the federation tag. if (conf.fedTag.empty()) { framing::Uuid uuid(true); federationTag = uuid.str(); } else federationTag = conf.fedTag; } // Early-Initialize plugins Plugin::earlyInitAll(*this); QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio); MessageGroupManager::setDefaults(conf.defaultMsgGroup); // If no plugin store module registered itself, set up the null store. if (NullMessageStore::isNullStore(store.get())) setStore(); framing::FieldTable args; // Default exchnge is not replicated. exchanges.declare(empty, DirectExchange::typeName, false, false, noReplicateArgs()); RecoveredObjects objects; if (store.get() != 0) { RecoveryManagerImpl recoverer( queues, exchanges, links, dtxManager, protocolRegistry, objects); recoveryInProgress = true; store->recover(recoverer); recoveryInProgress = false; } //ensure standard exchanges exist (done after recovery from store) declareStandardExchange(amq_direct, DirectExchange::typeName); declareStandardExchange(amq_topic, TopicExchange::typeName); declareStandardExchange(amq_fanout, FanOutExchange::typeName); declareStandardExchange(amq_match, HeadersExchange::typeName); if(conf.enableMgmt) { exchanges.declare(qpid_management, ManagementTopicExchange::typeName, false, false, noReplicateArgs()); Exchange::shared_ptr mExchange = exchanges.get(qpid_management); Exchange::shared_ptr dExchange = exchanges.get(amq_direct); managementAgent->setExchange(mExchange, dExchange); boost::dynamic_pointer_cast(mExchange)->setManagmentAgent(managementAgent.get(), 1); std::string qmfTopic("qmf.default.topic"); std::string qmfDirect("qmf.default.direct"); std::pair topicPair( exchanges.declare(qmfTopic, ManagementTopicExchange::typeName, false, false, noReplicateArgs())); std::pair directPair( exchanges.declare(qmfDirect, ManagementDirectExchange::typeName, false, false, noReplicateArgs())); boost::dynamic_pointer_cast(directPair.first)->setManagmentAgent(managementAgent.get(), 2); boost::dynamic_pointer_cast(topicPair.first)->setManagmentAgent(managementAgent.get(), 2); managementAgent->setExchangeV2(topicPair.first, directPair.first); } else QPID_LOG(info, "Management not enabled"); // this feature affects performance, so let's be sure that gets logged! if (conf.timestampRcvMsgs) { QPID_LOG(notice, "Receive message timestamping is ENABLED."); } /** * SASL setup, can fail and terminate startup */ if (conf.auth) { SaslAuthenticator::init(qpid::saslName, conf.saslConfigPath); QPID_LOG(info, "SASL enabled"); } else { QPID_LOG(notice, "SASL disabled: No Authentication Performed"); } // Initialize plugins Plugin::initializeAll(*this); //recover any objects via object factories objects.restore(*this); // Assign to queues their users who created them (can be done after ACL is loaded in Plugin::initializeAll above if ((getAcl()) && (store.get())) { queues.eachQueue(boost::bind(&qpid::broker::Queue::updateAclUserQueueCount, _1)); } if(conf.enableMgmt) { if (getAcl()) { mgmtObject->set_maxConns(getAcl()->getMaxConnectTotal()); } } if (conf.queueCleanInterval) { queueCleaner.start(conf.queueCleanInterval); } if (!conf.knownHosts.empty() && conf.knownHosts != knownHostsNone) { knownBrokers.push_back(Url(conf.knownHosts)); } } catch (const std::exception& e) { QPID_LOG(critical, logPrefix << "start-up failed: " << e.what()); finalize(); throw; } QPID_LOG(info, logPrefix << "initialized"); } void Broker::declareStandardExchange(const std::string& name, const std::string& type) { bool storeEnabled = store.get() != NULL; framing::FieldTable args; // Standard exchanges are not replicated. std::pair status = exchanges.declare(name, type, storeEnabled, false, noReplicateArgs()); if (status.second && storeEnabled) { store->create(*status.first, framing::FieldTable ()); } } bool Broker::isAuthenticating() const { return config.auth; } bool Broker::requireEncrypted() const { return config.requireEncrypted; } std::string Broker::getRealm() const { return config.realm; } bool Broker::getTcpNoDelay() const { return config.tcpNoDelay; } uint32_t Broker::getMaxNegotiateTime() const { return config.maxNegotiateTime; } uint16_t Broker::getPortOption() const { return config.port; } const std::vector& Broker::getListenInterfaces() const { return config.listenInterfaces; } int Broker::getConnectionBacklog() const { return config.connectionBacklog; } sys::Duration Broker::getLinkMaintenanceInterval() const { return config.linkMaintenanceInterval; } sys::Duration Broker::getLinkHeartbeatInterval() const { return config.linkHeartbeatInterval; } uint32_t Broker::getDtxMaxTimeout() const { return config.dtxMaxTimeout; } uint16_t Broker::getQueueThresholdEventRatio() const { return config.queueThresholdEventRatio; } uint Broker::getQueueLimit() const { return config.queueLimit; } boost::intrusive_ptr Broker::create(int16_t port) { BrokerOptions config; config.port=port; return create(config); } boost::intrusive_ptr Broker::create(const BrokerOptions& opts) { return boost::intrusive_ptr(new Broker(opts)); } void Broker::setStore (const boost::shared_ptr& _store) { // Exit now if multiple store plugins are attempting to load if (!NullMessageStore::isNullStore(store.get())) { QPID_LOG(error, "Multiple store plugins are not supported"); throw Exception(QPID_MSG("Failed to start broker: Multiple store plugins were loaded")); } store.reset(new MessageStoreModule (_store)); setStore(); } void Broker::setStore () { queues.setStore (store.get()); dtxManager.setStore (store.get()); links.setStore (store.get()); } void Broker::run() { if (config.workerThreads > 0) { QPID_LOG(info, logPrefix << "running"); Dispatcher d(poller); int numIOThreads = config.workerThreads; std::vector t(numIOThreads-1); // Run n-1 io threads for (int i=0; ishutdown(); } Broker::~Broker() { QPID_LOG(info, logPrefix << "shutting down"); if (mgmtObject != 0) mgmtObject->debugStats("destroying"); shutdown(); finalize(); // Finalize any plugins. if (config.auth) SaslAuthenticator::fini(); timer->stop(); managementAgent.reset(); } ManagementObject::shared_ptr Broker::GetManagementObject(void) const { return mgmtObject; } Manageable* Broker::GetVhostObject(void) const { return vhostObject.get(); } Manageable::status_t Broker::ManagementMethod (uint32_t methodId, Args& args, string& text) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; switch (methodId) { case _qmf::Broker::METHOD_ECHO : QPID_LOG (debug, "Broker::echo(" << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence << ", " << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body << ")"); status = Manageable::STATUS_OK; break; case _qmf::Broker::METHOD_CONNECT : { /** Management is creating a Link to a remote broker using the host and port of * the remote. This (old) interface does not allow management to specify a name * for the link, nor does it allow multiple Links to the same remote. Use the * "create()" broker method if these features are needed. * TBD: deprecate this interface. */ QPID_LOG(info, "The Broker::connect() method will be removed in a future release of QPID." " Please use the Broker::create() method with type='link' instead."); _qmf::ArgsBrokerConnect& hp= dynamic_cast<_qmf::ArgsBrokerConnect&>(args); string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport; QPID_LOG (debug, "Broker::connect() " << hp.i_host << ":" << hp.i_port << "; transport=" << transport << "; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\""); if (!getTransportInfo(transport).connectorFactory) { QPID_LOG(error, "Transport '" << transport << "' not supported"); text = "transport type not supported"; return Manageable::STATUS_NOT_IMPLEMENTED; } // Does a link to the remote already exist? If so, re-use the existing link // - this behavior is backward compatible with previous releases. if (!links.getLink(hp.i_host, hp.i_port, transport)) { // new link, need to generate a unique name for it std::pair response = links.declare(Link::createName(transport, hp.i_host, hp.i_port), hp.i_host, hp.i_port, transport, hp.i_durable, hp.i_authMechanism, hp.i_username, hp.i_password); if (!response.first) { text = "Unable to create Link"; status = Manageable::STATUS_PARAMETER_INVALID; break; } } status = Manageable::STATUS_OK; break; } case _qmf::Broker::METHOD_QUEUEMOVEMESSAGES : { _qmf::ArgsBrokerQueueMoveMessages& moveArgs= dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args); QPID_LOG (debug, "Broker::queueMoveMessages()"); if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter, getCurrentPublisher()) >=0) status = Manageable::STATUS_OK; else return Manageable::STATUS_PARAMETER_INVALID; break; } case _qmf::Broker::METHOD_SETLOGLEVEL : setLogLevel(dynamic_cast<_qmf::ArgsBrokerSetLogLevel&>(args).i_level); QPID_LOG (debug, "Broker::setLogLevel()"); status = Manageable::STATUS_OK; break; case _qmf::Broker::METHOD_GETLOGLEVEL : dynamic_cast<_qmf::ArgsBrokerGetLogLevel&>(args).o_level = getLogLevel(); QPID_LOG (debug, "Broker::getLogLevel()"); status = Manageable::STATUS_OK; break; case _qmf::Broker::METHOD_CREATE : { _qmf::ArgsBrokerCreate& a = dynamic_cast<_qmf::ArgsBrokerCreate&>(args); createObject(a.i_type, a.i_name, a.i_properties, a.i_strict, getCurrentPublisher()); status = Manageable::STATUS_OK; break; } case _qmf::Broker::METHOD_DELETE : { _qmf::ArgsBrokerDelete& a = dynamic_cast<_qmf::ArgsBrokerDelete&>(args); deleteObject(a.i_type, a.i_name, a.i_options, getCurrentPublisher()); status = Manageable::STATUS_OK; break; } case _qmf::Broker::METHOD_QUERY : { _qmf::ArgsBrokerQuery& a = dynamic_cast<_qmf::ArgsBrokerQuery&>(args); status = queryObject(a.i_type, a.i_name, a.o_results, getCurrentPublisher()); break; } case _qmf::Broker::METHOD_GETTIMESTAMPCONFIG: { _qmf::ArgsBrokerGetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerGetTimestampConfig&>(args); status = getTimestampConfig(a.o_receive, getCurrentPublisher()); break; } case _qmf::Broker::METHOD_SETTIMESTAMPCONFIG: { _qmf::ArgsBrokerSetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerSetTimestampConfig&>(args); status = setTimestampConfig(a.i_receive, getCurrentPublisher()); break; } case _qmf::Broker::METHOD_GETLOGHIRESTIMESTAMP: { dynamic_cast<_qmf::ArgsBrokerGetLogHiresTimestamp&>(args).o_logHires = getLogHiresTimestamp(); QPID_LOG (debug, "Broker::getLogHiresTimestamp()"); status = Manageable::STATUS_OK; break; } case _qmf::Broker::METHOD_SETLOGHIRESTIMESTAMP: { setLogHiresTimestamp(dynamic_cast<_qmf::ArgsBrokerSetLogHiresTimestamp&>(args).i_logHires); QPID_LOG (debug, "Broker::setLogHiresTimestamp()"); status = Manageable::STATUS_OK; break; } case _qmf::Broker::METHOD_QUEUEREDIRECT: { string srcQueue(dynamic_cast<_qmf::ArgsBrokerQueueRedirect&>(args).i_sourceQueue); string tgtQueue(dynamic_cast<_qmf::ArgsBrokerQueueRedirect&>(args).i_targetQueue); QPID_LOG (debug, "Broker::queueRedirect source queue:" << srcQueue << " to target queue " << tgtQueue); status = queueRedirect(srcQueue, tgtQueue, getCurrentPublisher()); break; } case _qmf::Broker::METHOD_SHUTDOWN : { QPID_LOG (info, "Broker received shutdown command"); shutdown(); } default: QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]"); status = Manageable::STATUS_NOT_IMPLEMENTED; break; } return status; } namespace { const std::string TYPE_QUEUE("queue"); const std::string TYPE_EXCHANGE("exchange"); const std::string TYPE_TOPIC("topic"); const std::string TYPE_BINDING("binding"); const std::string TYPE_LINK("link"); const std::string TYPE_BRIDGE("bridge"); const std::string DURABLE("durable"); const std::string AUTO_DELETE("auto-delete"); const std::string ALTERNATE_EXCHANGE("alternate-exchange"); const std::string EXCHANGE_TYPE("exchange-type"); const std::string QUEUE_NAME("queue"); const std::string EXCHANGE_NAME("exchange"); const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10"); const std::string _TRUE("true"); const std::string _FALSE("false"); // parameters for creating a Link object, see mgmt schema const std::string HOST("host"); const std::string PORT("port"); const std::string TRANSPORT("transport"); const std::string AUTH_MECHANISM("authMechanism"); const std::string USERNAME("username"); const std::string PASSWORD("password"); // parameters for creating a Bridge object, see mgmt schema const std::string LINK("link"); const std::string SRC("src"); const std::string DEST("dest"); const std::string KEY("key"); const std::string TAG("tag"); const std::string EXCLUDES("excludes"); const std::string SRC_IS_QUEUE("srcIsQueue"); const std::string SRC_IS_LOCAL("srcIsLocal"); const std::string DYNAMIC("dynamic"); const std::string SYNC("sync"); const std::string CREDIT("credit"); // parameters for deleting a Queue object const std::string IF_EMPTY("if_empty"); const std::string IF_UNUSED("if_unused"); } struct InvalidBindingIdentifier : public qpid::Exception { InvalidBindingIdentifier(const std::string& name) : qpid::Exception(name) {} std::string getPrefix() const { return "invalid binding"; } }; struct BindingIdentifier { std::string exchange; std::string queue; std::string key; BindingIdentifier(const std::string& name) { std::vector path; split(path, name, "/"); switch (path.size()) { case 1: queue = path[0]; break; case 2: exchange = path[0]; queue = path[1]; break; case 3: exchange = path[0]; queue = path[1]; key = path[2]; break; default: throw InvalidBindingIdentifier(name); } } }; struct ObjectAlreadyExists : public qpid::Exception { ObjectAlreadyExists(const std::string& name) : qpid::Exception(name) {} std::string getPrefix() const { return "object already exists"; } }; struct UnknownObjectType : public qpid::Exception { UnknownObjectType(const std::string& type) : qpid::Exception(type) {} std::string getPrefix() const { return "unknown object type"; } }; struct ReservedObjectName : public qpid::Exception { ReservedObjectName(const std::string& type) : qpid::Exception(type) {} std::string getPrefix() const { return std::string("names prefixed with '") + QPID_NAME_PREFIX + std::string("' are reserved"); } }; struct UnsupportedTransport : public qpid::Exception { UnsupportedTransport(const std::string& type) : qpid::Exception(type) {} std::string getPrefix() const { return "transport is not supported"; } }; struct InvalidParameter : public qpid::Exception { InvalidParameter(const std::string& type) : qpid::Exception(type) {} std::string getPrefix() const { return "invalid parameter to method call"; } }; void Broker::createObject(const std::string& type, const std::string& name, const Variant::Map& properties, bool /*strict*/, const Connection* context) { std::string userId; std::string connectionId; if (context) { userId = context->getUserId(); connectionId = context->getMgmtId(); } //TODO: implement 'strict' option (check there are no unrecognised properties) QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ")"); if (objectFactory.createObject(*this, type, name, properties, userId, connectionId)) { QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ") handled by registered factory"); } else if (type == TYPE_QUEUE) { bool durable(false); bool autodelete(false); std::string alternateExchange; Variant::Map extensions; for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { // extract durable, auto-delete and alternate-exchange properties if (i->first == DURABLE) durable = i->second; else if (i->first == AUTO_DELETE) autodelete = i->second; else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString(); //treat everything else as extension properties else extensions[i->first] = i->second; } QueueSettings settings(durable, autodelete); Variant::Map unused; settings.populate(extensions, unused); qpid::amqp_0_10::translate(unused, settings.storeSettings); //TODO: unused doesn't take store settings into account... so can't yet implement strict QPID_LOG(debug, "Broker did not use the following settings (store module may): " << unused); std::pair, bool> result = createQueue(name, settings, 0, alternateExchange, userId, connectionId); if (!result.second) { throw ObjectAlreadyExists(name); } } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) { bool durable(false); bool autodelete(false); std::string exchangeType("topic"); std::string alternateExchange; Variant::Map extensions; for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { // extract durable, auto-delete and alternate-exchange properties if (i->first == DURABLE) durable = i->second; else if (i->first == AUTO_DELETE) autodelete = i->second; else if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString(); else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString(); //treat everything else as extension properties else extensions[i->first] = i->second; } framing::FieldTable arguments; qpid::amqp_0_10::translate(extensions, arguments); try { std::pair, bool> result = createExchange(name, exchangeType, durable, autodelete, alternateExchange, arguments, userId, connectionId); if (!result.second) { throw ObjectAlreadyExists(name); } } catch (const UnknownExchangeTypeException&) { throw Exception(QPID_MSG("Invalid exchange type: " << exchangeType)); } } else if (type == TYPE_BINDING) { BindingIdentifier binding(name); std::string exchangeType("topic"); Variant::Map extensions; for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { // extract durable, auto-delete and alternate-exchange properties if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString(); //treat everything else as extension properties else extensions[i->first] = i->second; } framing::FieldTable arguments; qpid::amqp_0_10::translate(extensions, arguments); bind(binding.queue, binding.exchange, binding.key, arguments, 0, userId, connectionId); } else if (type == TYPE_LINK) { QPID_LOG (debug, "createObject: Link; name=" << name << "; args=" << properties ); if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) { QPID_LOG(error, "Link name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'"); throw ReservedObjectName(name); } std::string host; uint16_t port = 0; std::string transport = TCP_TRANSPORT; bool durable = false; std::string authMech, username, password; for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { if (i->first == HOST) host = i->second.asString(); else if (i->first == PORT) port = i->second.asUint16(); else if (i->first == TRANSPORT) transport = i->second.asString(); else if (i->first == DURABLE) durable = bool(i->second); else if (i->first == AUTH_MECHANISM) authMech = i->second.asString(); else if (i->first == USERNAME) username = i->second.asString(); else if (i->first == PASSWORD) password = i->second.asString(); else { // TODO: strict checking here } } if (!getTransportInfo(transport).connectorFactory) { QPID_LOG(error, "Transport '" << transport << "' not supported."); throw UnsupportedTransport(transport); } std::pair, bool> rc; rc = links.declare(name, host, port, transport, durable, authMech, username, password); if (!rc.first) { QPID_LOG (error, "Failed to create Link object, name=" << name << " remote=" << host << ":" << port << "; transport=" << transport << "; durable=" << (durable?"T":"F") << "; authMech=\"" << authMech << "\""); throw InvalidParameter(name); } if (!rc.second) { QPID_LOG (error, "Failed to create a new Link object, name=" << name << " already exists."); throw ObjectAlreadyExists(name); } } else if (type == TYPE_BRIDGE) { QPID_LOG (debug, "createObject: Bridge; name=" << name << "; args=" << properties ); if (name.compare(0, QPID_NAME_PREFIX.length(), QPID_NAME_PREFIX) == 0) { QPID_LOG(error, "Bridge name='" << name << "' cannot use the reserved prefix '" << QPID_NAME_PREFIX << "'"); throw ReservedObjectName(name); } std::string linkName; std::string src; std::string dest; std::string key; std::string id; std::string excludes; std::string queueName; bool durable = false; bool srcIsQueue = false; bool srcIsLocal = false; bool dynamic = false; uint16_t sync = 0; uint32_t credit = LinkRegistry::INFINITE_CREDIT; for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { if (i->first == LINK) linkName = i->second.asString(); else if (i->first == SRC) src = i->second.asString(); else if (i->first == DEST) dest = i->second.asString(); else if (i->first == KEY) key = i->second.asString(); else if (i->first == TAG) id = i->second.asString(); else if (i->first == EXCLUDES) excludes = i->second.asString(); else if (i->first == SRC_IS_QUEUE) srcIsQueue = bool(i->second); else if (i->first == SRC_IS_LOCAL) srcIsLocal = bool(i->second); else if (i->first == DYNAMIC) dynamic = bool(i->second); else if (i->first == SYNC) sync = i->second.asUint16(); else if (i->first == CREDIT) credit = i->second.asUint32(); else if (i->first == DURABLE) durable = bool(i->second); else if (i->first == QUEUE_NAME) queueName = i->second.asString(); else { // TODO: strict checking here } } boost::shared_ptr link; if (linkName.empty() || !(link = links.getLink(linkName))) { QPID_LOG(error, "Link '" << linkName << "' not found; bridge create failed."); throw InvalidParameter(name); } std::pair rc = links.declare(name, *link, durable, src, dest, key, srcIsQueue, srcIsLocal, id, excludes, dynamic, sync, credit, 0, queueName); if (!rc.first) { QPID_LOG (error, "Failed to create Bridge object, name=" << name << " link=" << linkName << "; src=" << src << "; dest=" << dest << "; key=" << key); throw InvalidParameter(name); } if (!rc.second) { QPID_LOG (error, "Failed to create a new Bridge object, name=" << name << " already exists."); throw ObjectAlreadyExists(name); } } else { throw UnknownObjectType(type); } } void Broker::deleteObject(const std::string& type, const std::string& name, const Variant::Map& options, const Connection* context) { std::string userId; std::string connectionId; if (context) { userId = context->getUserId(); connectionId = context->getMgmtId(); } QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ")"); if (objectFactory.deleteObject(*this, type, name, options, userId, connectionId)) { QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ") handled by registered factory"); } else if (type == TYPE_QUEUE) { // extract ifEmpty and ifUnused from options bool ifUnused = false, ifEmpty = false; for (Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) { if (i->first == IF_UNUSED) ifUnused = i->second.asBool(); else if (i->first == IF_EMPTY) ifEmpty = i->second.asBool(); } deleteQueue(name, userId, connectionId, boost::bind(&Broker::checkDeleteQueue, this, _1, ifUnused, ifEmpty)); } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) { deleteExchange(name, userId, connectionId); } else if (type == TYPE_BINDING) { BindingIdentifier binding(name); unbind(binding.queue, binding.exchange, binding.key, 0, userId, connectionId); } else if (type == TYPE_LINK) { boost::shared_ptr link = links.getLink(name); if (link) { link->close(); } } else if (type == TYPE_BRIDGE) { boost::shared_ptr bridge = links.getBridge(name); if (bridge) { bridge->close(); } } else { throw UnknownObjectType(type); } } void Broker::checkDeleteQueue(Queue::shared_ptr queue, bool ifUnused, bool ifEmpty) { if(ifEmpty && queue->getMessageCount() > 0) { throw qpid::framing::PreconditionFailedException(QPID_MSG("Cannot delete queue " << queue->getName() << "; queue not empty")); } else if(ifUnused && queue->getConsumerCount() > 0) { throw qpid::framing::PreconditionFailedException(QPID_MSG("Cannot delete queue " << queue->getName() << "; queue in use")); } } Manageable::status_t Broker::queryObject(const std::string& type, const std::string& name, Variant::Map& results, const Connection* context) { std::string userId; std::string connectionId; if (context) { userId = context->getUserId(); connectionId = context->getMgmtId(); } QPID_LOG (debug, "Broker::query(" << type << ", " << name << ")"); if (type == TYPE_QUEUE) return queryQueue( name, userId, connectionId, results ); if (type == TYPE_EXCHANGE || type == TYPE_TOPIC || type == TYPE_BINDING) return Manageable::STATUS_NOT_IMPLEMENTED; throw UnknownObjectType(type); } Manageable::status_t Broker::queryQueue( const std::string& name, const std::string& userId, const std::string& /*connectionId*/, Variant::Map& results ) { (void) results; if (acl) { if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUEUE, name, NULL) ) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue query request from " << userId)); } boost::shared_ptr q(queues.find(name)); if (!q) { QPID_LOG(error, "Query failed: queue not found, name=" << name); return Manageable::STATUS_UNKNOWN_OBJECT; } q->query( results ); return Manageable::STATUS_OK;; } Manageable::status_t Broker::getTimestampConfig(bool& receive, const Connection* context) { std::string name; // none needed for broker std::string userId = context->getUserId(); if (acl && !acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_BROKER, name, NULL)) { throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp get request from " << userId)); } receive = timestampRcvMsgs; return Manageable::STATUS_OK; } Manageable::status_t Broker::setTimestampConfig(const bool receive, const Connection* context) { std::string name; // none needed for broker std::string userId = context->getUserId(); if (acl && !acl->authorise(userId, acl::ACT_UPDATE, acl::OBJ_BROKER, name, NULL)) { throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp set request from " << userId)); } timestampRcvMsgs = receive; QPID_LOG(notice, "Receive message timestamping is " << ((timestampRcvMsgs) ? "ENABLED." : "DISABLED.")); return Manageable::STATUS_OK; } void Broker::setLogLevel(const std::string& level) { QPID_LOG(notice, "Changing log level to " << level); std::vector selectors; split(selectors, level, ", "); qpid::log::Logger::instance().reconfigure(selectors); } std::string Broker::getLogLevel() { std::string level; std::string sep(""); const std::vector& selectors = qpid::log::Logger::instance().getOptions().selectors; for (std::vector::const_iterator i = selectors.begin(); i != selectors.end(); ++i) { level += sep + *i; sep = ","; } const std::vector& disselectors = qpid::log::Logger::instance().getOptions().deselectors; for (std::vector::const_iterator i = disselectors.begin(); i != disselectors.end(); ++i) { level += sep + "!" + *i; sep = ","; } return level; } void Broker::setLogHiresTimestamp(bool enabled) { QPID_LOG(notice, "Changing log hires timestamp to " << enabled); qpid::log::Logger::instance().setHiresTimestamp(enabled); } bool Broker::getLogHiresTimestamp() { return qpid::log::Logger::instance().getHiresTimestamp(); } Manageable::status_t Broker::queueRedirect(const std::string& srcQueue, const std::string& tgtQueue, const Connection* context) { Queue::shared_ptr srcQ(queues.find(srcQueue)); if (!srcQ) { QPID_LOG(error, "Queue redirect failed: source queue not found: " << srcQueue); return Manageable::STATUS_UNKNOWN_OBJECT; } if (!tgtQueue.empty()) { // NonBlank target queue creates partnership Queue::shared_ptr tgtQ(queues.find(tgtQueue)); if (!tgtQ) { QPID_LOG(error, "Queue redirect failed: target queue not found: " << tgtQueue); return Manageable::STATUS_UNKNOWN_OBJECT; } if (srcQueue.compare(tgtQueue) == 0) { QPID_LOG(error, "Queue redirect source queue: " << tgtQueue << " cannot be its own target"); return Manageable::STATUS_USER; } if (srcQ->isAutoDelete()) { QPID_LOG(error, "Queue redirect source queue: " << srcQueue << " is autodelete and can not be part of redirect"); return Manageable::STATUS_USER; } if (tgtQ->isAutoDelete()) { QPID_LOG(error, "Queue redirect target queue: " << tgtQueue << " is autodelete and can not be part of redirect"); return Manageable::STATUS_USER; } if (srcQ->getRedirectPeer()) { QPID_LOG(error, "Queue redirect source queue: " << srcQueue << " is already redirected"); return Manageable::STATUS_USER; } if (tgtQ->getRedirectPeer()) { QPID_LOG(error, "Queue redirect target queue: " << tgtQueue << " is already redirected"); return Manageable::STATUS_USER; } if (acl) { std::map params; params.insert(make_pair(acl::PROP_QUEUENAME, tgtQ->getName())); if (!acl->authorise((context)?context->getUserId():"", acl::ACT_REDIRECT, acl::OBJ_QUEUE, srcQ->getName(), ¶ms)) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied redirect request from " << ((context)?context->getUserId():"(uknown)"))); } // Start the backup overflow partnership srcQ->setRedirectPeer(tgtQ, true); tgtQ->setRedirectPeer(srcQ, false); // Set management state srcQ->setMgmtRedirectState(tgtQueue, true, true); tgtQ->setMgmtRedirectState(srcQueue, true, false); // Management event if (managementAgent.get()) { managementAgent->raiseEvent(_qmf::EventQueueRedirect(srcQueue, tgtQueue)); } QPID_LOG(info, "Queue redirect complete. queue: " << srcQueue << " target queue: " << tgtQueue); return Manageable::STATUS_OK; } else { // Blank target queue destroys partnership Queue::shared_ptr tgtQ(srcQ->getRedirectPeer()); if (!tgtQ) { QPID_LOG(error, "Queue redirect source queue: " << srcQueue << " is not in redirected"); return Manageable::STATUS_USER; } if (!srcQ->isRedirectSource()) { QPID_LOG(error, "Queue redirect source queue: " << srcQueue << " is not a redirect source"); return Manageable::STATUS_USER; } if (acl) { std::map params; params.insert(make_pair(acl::PROP_QUEUENAME, tgtQ->getName())); if (!acl->authorise((context)?context->getUserId():"", acl::ACT_REDIRECT, acl::OBJ_QUEUE, srcQ->getName(), ¶ms)) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied redirect request from " << ((context)?context->getUserId():"(uknown)"))); } queueRedirectDestroy(srcQ, tgtQ, true); return Manageable::STATUS_OK; } } void Broker::queueRedirectDestroy(Queue::shared_ptr srcQ, Queue::shared_ptr tgtQ, bool moveMsgs) { QPID_LOG(notice, "Queue redirect destroyed. queue: " << srcQ->getName() << " target queue: " << tgtQ->getName()); tgtQ->setMgmtRedirectState(empty, false, false); srcQ->setMgmtRedirectState(empty, false, false); if (moveMsgs) { // TODO: this 'move' works in the static case but has no // actual locking that does what redirect needs when // there is a lot of traffic in flight. tgtQ->move(srcQ, 0); } Queue::shared_ptr np; tgtQ->setRedirectPeer(np, false); srcQ->setRedirectPeer(np, false); if (managementAgent.get()) { managementAgent->raiseEvent(_qmf::EventQueueRedirectCancelled(srcQ->getName(), tgtQ->getName())); } } const Broker::TransportInfo& Broker::getTransportInfo(const std::string& name) const { static TransportInfo nullTransportInfo; TransportMap::const_iterator i = name.empty() ? transportMap.begin() : transportMap.find(name); if (i == transportMap.end()) return nullTransportInfo; else return i->second; } uint16_t Broker::getPort(const std::string& name) const { if (int p = getTransportInfo(name).port) { return p; } else { throw NoSuchTransportException(QPID_MSG("No such transport: '" << name << "'")); } } bool Broker::shouldListen(std::string transport) { return disabledListeningTransports.count(transport)==0; } void Broker::disableListening(std::string transport) { disabledListeningTransports.insert(transport); } void Broker::registerTransport(const std::string& name, boost::shared_ptr a, boost::shared_ptr c, uint16_t p) { transportMap[name] = TransportInfo(a, c, p); Url::addProtocol(name); } void Broker::accept() { unsigned accepting = 0; for (TransportMap::const_iterator i = transportMap.begin(); i != transportMap.end(); i++) { if (i->second.acceptor) { i->second.acceptor->accept(poller, factory.get()); ++accepting; } } if ( accepting==0 ) { throw Exception(QPID_MSG("Failed to start broker: No transports are listening for incoming connections")); } } void Broker::connect( const std::string& name, const std::string& host, const std::string& port, const std::string& transport, boost::function2 failed) { connect(name, host, port, transport, factory.get(), failed); } void Broker::connect( const std::string& name, const std::string& host, const std::string& port, const std::string& transport, sys::ConnectionCodec::Factory* f, boost::function2 failed) { boost::shared_ptr tcf = getTransportInfo(transport).connectorFactory; if (tcf) tcf->connect(poller, name, host, port, f, failed); else throw NoSuchTransportException(QPID_MSG("Unsupported transport type: " << transport)); } int32_t Broker::queueMoveMessages( const std::string& srcQueue, const std::string& destQueue, uint32_t qty, const Variant::Map& filter, const Connection* context) { Queue::shared_ptr src_queue = queues.find(srcQueue); if (!src_queue) return -1; Queue::shared_ptr dest_queue = queues.find(destQueue); if (!dest_queue) return -1; if (acl) { std::map params; params.insert(make_pair(acl::PROP_QUEUENAME, dest_queue->getName())); if (!acl->authorise((context)?context->getUserId():"", acl::ACT_MOVE, acl::OBJ_QUEUE, src_queue->getName(), ¶ms)) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied move request from " << ((context)?context->getUserId():"(uknown)"))); } return (int32_t) src_queue->move(dest_queue, qty, &filter); } boost::shared_ptr Broker::getPoller() { return poller; } std::vector Broker::getKnownBrokersImpl() { return knownBrokers; } bool Broker::deferDeliveryImpl(const std::string&, const Message&) { return false; } const std::string Broker::TCP_TRANSPORT("tcp"); std::pair, bool> Broker::createQueue( const std::string& name, const QueueSettings& constSettings, const OwnershipToken* owner, const std::string& alternateExchange, const std::string& userId, const std::string& connectionId) { QueueSettings settings(constSettings); // So we can modify them if (acl) { std::map params; params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); params.insert(make_pair(acl::PROP_DURABLE, settings.durable ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_AUTODELETE, settings.autodelete ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_POLICYTYPE, settings.getLimitPolicy())); params.insert(make_pair(acl::PROP_PAGING, settings.paging ? _TRUE : _FALSE)); if (settings.paging) { params.insert(make_pair(acl::PROP_MAXPAGES, boost::lexical_cast(settings.maxPages ? settings.maxPages : DEFAULT_MAX_PAGES))); params.insert(make_pair(acl::PROP_MAXPAGEFACTOR, boost::lexical_cast(settings.pageFactor ? settings.pageFactor : DEFAULT_PAGE_FACTOR))); } if (settings.maxDepth.hasCount()) params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast(settings.maxDepth.getCount() ? settings.maxDepth.getCount() : std::numeric_limits::max()))); if (settings.maxDepth.hasSize()) params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast(settings.maxDepth.getSize() ? settings.maxDepth.getSize() : std::numeric_limits::max()))); else params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast(config.queueLimit))); if (settings.durable) { params.insert(make_pair(acl::PROP_MAXFILECOUNT, boost::lexical_cast(settings.maxFileCount ? settings.maxFileCount : 8))); params.insert(make_pair(acl::PROP_MAXFILESIZE, boost::lexical_cast(settings.maxFileSize ? settings.maxFileSize : 24))); } if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) ) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId)); if (!queues.find(name)) if (!acl->approveCreateQueue(userId,name) ) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId)); } Exchange::shared_ptr alternate; if (!alternateExchange.empty()) { alternate = exchanges.get(alternateExchange); if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange)); } // Identify queues that won't survive a failover: exclusive, auto-delete with no delay. if (owner && settings.autodelete && !settings.autoDeleteDelay) settings.isTemporary = true; std::pair result = queues.declare(name, settings, alternate, false/*recovering*/, owner, connectionId, userId); if (result.second) { //add default binding: result.first->bind(exchanges.getDefault(), name); QPID_LOG_CAT(debug, model, "Create queue. name:" << name << " user:" << userId << " rhost:" << connectionId << " durable:" << (settings.durable ? "T" : "F") << " owner:" << owner << " autodelete:" << (settings.autodelete ? "T" : "F") << " alternateExchange:" << alternateExchange ); } return result; } void Broker::deleteQueue(const std::string& name, const std::string& userId, const std::string& connectionId, QueueFunctor check) { QPID_LOG_CAT(debug, model, "Deleting queue. name:" << name << " user:" << userId << " rhost:" << connectionId ); Queue::shared_ptr queue = queues.find(name); if (queue) { if (acl) { std::map params; boost::shared_ptr altEx = queue->getAlternateExchange(); params.insert(make_pair(acl::PROP_ALTERNATE, (altEx) ? altEx->getName() : "" )); params.insert(make_pair(acl::PROP_DURABLE, queue->isDurable() ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_EXCLUSIVE, queue->hasExclusiveOwner() ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_AUTODELETE, queue->isAutoDelete() ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_POLICYTYPE, queue->getSettings().getLimitPolicy())); if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,¶ms) ) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId)); } if (check) check(queue); if (acl) acl->recordDestroyQueue(name); Queue::shared_ptr peerQ(queue->getRedirectPeer()); if (peerQ) queueRedirectDestroy(queue->isRedirectSource() ? queue : peerQ, queue->isRedirectSource() ? peerQ : queue, false); queues.destroy(name, connectionId, userId); queue->destroyed(); } else { throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name)); } } std::pair Broker::createExchange( const std::string& name, const std::string& type, bool durable, bool autodelete, const std::string& alternateExchange, const qpid::framing::FieldTable& arguments, const std::string& userId, const std::string& connectionId) { if (acl) { std::map params; params.insert(make_pair(acl::PROP_TYPE, type)); params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE)); params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? _TRUE : _FALSE)); if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_EXCHANGE,name,¶ms) ) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << userId)); } Exchange::shared_ptr alternate; if (!alternateExchange.empty()) { alternate = exchanges.get(alternateExchange); if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange)); } std::pair result; result = exchanges.declare( name, type, durable, autodelete, arguments, alternate, connectionId, userId); if (result.second) { if (durable) { store->create(*result.first, arguments); } QPID_LOG_CAT(debug, model, "Create exchange. name:" << name << " user:" << userId << " rhost:" << connectionId << " type:" << type << " alternateExchange:" << alternateExchange << " durable:" << (durable ? "T" : "F") << " autodelete:" << (autodelete ? "T" : "F")); } return result; } void Broker::deleteExchange(const std::string& name, const std::string& userId, const std::string& connectionId) { QPID_LOG_CAT(debug, model, "Deleting exchange. name:" << name << " user:" << userId << " rhost:" << connectionId); if (name.empty()) { throw framing::InvalidArgumentException(QPID_MSG("Delete not allowed for default exchange")); } Exchange::shared_ptr exchange(exchanges.get(name)); if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name)); if (acl) { std::map params; Exchange::shared_ptr altEx = exchange->getAlternate(); params.insert(make_pair(acl::PROP_TYPE, exchange->getType())); params.insert(make_pair(acl::PROP_ALTERNATE, (altEx) ? altEx->getName() : "" )); params.insert(make_pair(acl::PROP_DURABLE, exchange->isDurable() ? _TRUE : _FALSE)); if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,¶ms) ) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId)); } if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Cannot delete " << name <<", in use as alternate-exchange.")); if (exchange->isDurable()) store->destroy(*exchange); if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); exchanges.destroy(name, connectionId, userId); } void Broker::bind(const std::string& queueName, const std::string& exchangeName, const std::string& key, const qpid::framing::FieldTable& arguments, const OwnershipToken* owner, const std::string& userId, const std::string& connectionId) { if (acl) { std::map params; params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); params.insert(make_pair(acl::PROP_ROUTINGKEY, key)); if (!acl->authorise(userId,acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,¶ms)) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << userId)); } if (exchangeName.empty()) { throw framing::InvalidArgumentException(QPID_MSG("Bind not allowed for default exchange")); } Queue::shared_ptr queue = queues.find(queueName); Exchange::shared_ptr exchange = exchanges.get(exchangeName); if (!queue) { throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName)); } else if (!exchange) { throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName)); } else if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(owner)) { throw framing::ResourceLockedException(QPID_MSG("Cannot bind queue " << queue->getName() << "; it is exclusive to another session")); } else { if (queue->bind(exchange, key, arguments)) { getBrokerObservers().bind(exchange, queue, key, arguments); if (managementAgent.get()) { managementAgent->raiseEvent(_qmf::EventBind(connectionId, userId, exchangeName, queueName, key, ManagementAgent::toMap(arguments))); } QPID_LOG_CAT(debug, model, "Create binding. exchange:" << exchangeName << " queue:" << queueName << " key:" << key << " arguments:" << arguments << " user:" << userId << " rhost:" << connectionId); } } } void Broker::unbind(const std::string& queueName, const std::string& exchangeName, const std::string& key, const OwnershipToken* owner, const std::string& userId, const std::string& connectionId) { if (acl) { std::map params; params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); params.insert(make_pair(acl::PROP_ROUTINGKEY, key)); if (!acl->authorise(userId,acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,¶ms) ) throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << userId)); } if (exchangeName.empty()) { throw framing::InvalidArgumentException(QPID_MSG("Unbind not allowed for default exchange")); } Queue::shared_ptr queue = queues.find(queueName); Exchange::shared_ptr exchange = exchanges.get(exchangeName); if (!queue) { throw framing::NotFoundException(QPID_MSG("Unbind failed. No such queue: " << queueName)); } else if (!exchange) { throw framing::NotFoundException(QPID_MSG("Unbind failed. No such exchange: " << exchangeName)); } else if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(owner)) { throw framing::ResourceLockedException(QPID_MSG("Cannot unbind queue " << queue->getName() << "; it is exclusive to another session")); } else { if (exchange->unbind(queue, key, 0)) { if (exchange->isDurable() && queue->isDurable()) { store->unbind(*exchange, *queue, key, qpid::framing::FieldTable()); } getBrokerObservers().unbind( exchange, queue, key, framing::FieldTable()); if (managementAgent.get()) { managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key)); } QPID_LOG_CAT(debug, model, "Delete binding. exchange:" << exchangeName << " queue:" << queueName << " key:" << key << " user:" << userId << " rhost:" << connectionId); } } } // FIXME aconway 2012-04-27: access to linkClientProperties is // not properly thread safe, you could lose fields if 2 threads // attempt to add a field concurrently. framing::FieldTable Broker::getLinkClientProperties() const { sys::Mutex::ScopedLock l(linkClientPropertiesLock); return linkClientProperties; } void Broker::setLinkClientProperties(const framing::FieldTable& ft) { sys::Mutex::ScopedLock l(linkClientPropertiesLock); linkClientProperties = ft; } }} // namespace qpid::broker