diff options
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 566 |
1 files changed, 546 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 3c692fc368..ec3cf9d340 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -20,6 +20,7 @@ */ #include "qpid/broker/Broker.h" +#include "qpid/broker/ConnectionState.h" #include "qpid/broker/DirectExchange.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" @@ -31,12 +32,26 @@ #include "qpid/broker/TopicExchange.h" #include "qpid/broker/Link.h" #include "qpid/broker/ExpiryPolicy.h" +#include "qpid/broker/QueueFlowLimit.h" +#include "qpid/broker/MessageGroupManager.h" #include "qmf/org/apache/qpid/broker/Package.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerQuery.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerGetLogLevel.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerQueueMoveMessages.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerSetLogLevel.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerSetTimestampConfig.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerGetTimestampConfig.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" +#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" +#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" +#include "qmf/org/apache/qpid/broker/EventQueueDelete.h" +#include "qmf/org/apache/qpid/broker/EventBind.h" +#include "qmf/org/apache/qpid/broker/EventUnbind.h" +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/management/ManagementDirectExchange.h" #include "qpid/management/ManagementTopicExchange.h" #include "qpid/log/Logger.h" @@ -44,7 +59,9 @@ #include "qpid/log/Statement.h" #include "qpid/log/posix/SinkOptions.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FieldTable.h" #include "qpid/framing/ProtocolInitiation.h" +#include "qpid/framing/reply_exceptions.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/ProtocolFactory.h" #include "qpid/sys/Poller.h" @@ -76,7 +93,10 @@ using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; +using qpid::management::getManagementExecutionContext; +using qpid::types::Variant; using std::string; +using std::make_pair; namespace _qmf = qmf::org::apache::qpid::broker; @@ -103,7 +123,12 @@ Broker::Options::Options(const std::string& name) : maxSessionRate(0), asyncQueueEvents(false), // Must be false in a cluster. qmf2Support(true), - qmf1Support(true) + qmf1Support(true), + queueFlowStopRatio(80), + queueFlowResumeRatio(70), + queueThresholdEventRatio(80), + defaultMsgGroup("qpid.no-group"), + timestampRcvMsgs(false) // set the 0.10 timestamp delivery property { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -134,9 +159,14 @@ Broker::Options::Options(const std::string& name) : ("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections") ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted") ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)") - ("sasl-config", optValue(saslConfigPath, "FILE"), "gets sasl config from nonstandard location") + ("sasl-config", optValue(saslConfigPath, "DIR"), "gets sasl config info from nonstandard location") ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)") - ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication"); + ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication") + ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is activated.") + ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "PERCENT"), "Percent of queue's maximum capacity at which flow control is de-activated.") + ("default-event-threshold-ratio", optValue(queueThresholdEventRatio, "%age of limit"), "The ratio of any specified queue limit at which an event will be raised") + ("default-message-group", optValue(defaultMsgGroup, "GROUP-IDENTIFER"), "Group identifier to assign to messages delivered to a message group queue that do not contain an identifier.") + ("enable-timestamp", optValue(timestampRcvMsgs, "yes|no"), "Add current time to each received message."); } const std::string empty; @@ -166,9 +196,10 @@ Broker::Broker(const Broker::Options& conf) : conf.replayFlushLimit*1024, // convert kb to bytes. conf.replayHardLimit*1024), *this), - queueCleaner(queues, timer), - queueEvents(poller,!conf.asyncQueueEvents), + queueCleaner(queues, &timer), + queueEvents(poller,!conf.asyncQueueEvents), recovery(true), + inCluster(false), clusterUpdatee(false), expiryPolicy(new ExpiryPolicy), connectionCounter(conf.maxConnections), @@ -225,8 +256,11 @@ Broker::Broker(const Broker::Options& conf) : // Early-Initialize plugins Plugin::earlyInitAll(*this); + QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio); + MessageGroupManager::setDefaults(conf.defaultMsgGroup); + // If no plugin store module registered itself, set up the null store. - if (NullMessageStore::isNullStore(store.get())) + if (NullMessageStore::isNullStore(store.get())) setStore(); exchanges.declare(empty, DirectExchange::typeName); // Default exchange. @@ -271,6 +305,11 @@ Broker::Broker(const Broker::Options& conf) : else QPID_LOG(info, "Management not enabled"); + // this feature affects performance, so let's be sure that gets logged! + if (conf.timestampRcvMsgs) { + QPID_LOG(notice, "Receive message timestamping is ENABLED."); + } + /** * SASL setup, can fail and terminate startup */ @@ -345,14 +384,14 @@ void Broker::run() { Dispatcher d(poller); int numIOThreads = config.workerThreads; std::vector<Thread> t(numIOThreads-1); - + // Run n-1 io threads for (int i=0; i<numIOThreads-1; ++i) t[i] = Thread(d); - + // Run final thread d.run(); - + // Now wait for n-1 io threads to exit for (int i=0; i<numIOThreads-1; ++i) { t[i].join(); @@ -399,9 +438,9 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, { case _qmf::Broker::METHOD_ECHO : QPID_LOG (debug, "Broker::echo(" - << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence - << ", " - << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body + << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence + << ", " + << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body << ")"); status = Manageable::STATUS_OK; break; @@ -409,8 +448,9 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, _qmf::ArgsBrokerConnect& hp= dynamic_cast<_qmf::ArgsBrokerConnect&>(args); - QPID_LOG (debug, "Broker::connect()"); string transport = hp.i_transport.empty() ? TCP_TRANSPORT : hp.i_transport; + QPID_LOG (debug, "Broker::connect() " << hp.i_host << ":" << hp.i_port << "; transport=" << transport << + "; durable=" << (hp.i_durable?"T":"F") << "; authMech=\"" << hp.i_authMechanism << "\""); if (!getProtocolFactory(transport)) { QPID_LOG(error, "Transport '" << transport << "' not supported"); return Manageable::STATUS_NOT_IMPLEMENTED; @@ -427,9 +467,9 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, _qmf::ArgsBrokerQueueMoveMessages& moveArgs= dynamic_cast<_qmf::ArgsBrokerQueueMoveMessages&>(args); QPID_LOG (debug, "Broker::queueMoveMessages()"); - if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty)) + if (queueMoveMessages(moveArgs.i_srcQueue, moveArgs.i_destQueue, moveArgs.i_qty, moveArgs.i_filter)) status = Manageable::STATUS_OK; - else + else return Manageable::STATUS_PARAMETER_INVALID; break; } @@ -443,6 +483,38 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, QPID_LOG (debug, "Broker::getLogLevel()"); status = Manageable::STATUS_OK; break; + case _qmf::Broker::METHOD_CREATE : + { + _qmf::ArgsBrokerCreate& a = dynamic_cast<_qmf::ArgsBrokerCreate&>(args); + createObject(a.i_type, a.i_name, a.i_properties, a.i_strict, getManagementExecutionContext()); + status = Manageable::STATUS_OK; + break; + } + case _qmf::Broker::METHOD_DELETE : + { + _qmf::ArgsBrokerDelete& a = dynamic_cast<_qmf::ArgsBrokerDelete&>(args); + deleteObject(a.i_type, a.i_name, a.i_options, getManagementExecutionContext()); + status = Manageable::STATUS_OK; + break; + } + case _qmf::Broker::METHOD_QUERY : + { + _qmf::ArgsBrokerQuery& a = dynamic_cast<_qmf::ArgsBrokerQuery&>(args); + status = queryObject(a.i_type, a.i_name, a.o_results, getManagementExecutionContext()); + break; + } + case _qmf::Broker::METHOD_GETTIMESTAMPCONFIG: + { + _qmf::ArgsBrokerGetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerGetTimestampConfig&>(args); + status = getTimestampConfig(a.o_receive, getManagementExecutionContext()); + break; + } + case _qmf::Broker::METHOD_SETTIMESTAMPCONFIG: + { + _qmf::ArgsBrokerSetTimestampConfig& a = dynamic_cast<_qmf::ArgsBrokerSetTimestampConfig&>(args); + status = setTimestampConfig(a.i_receive, getManagementExecutionContext()); + break; + } default: QPID_LOG (debug, "Broker ManagementMethod not implemented: id=" << methodId << "]"); status = Manageable::STATUS_NOT_IMPLEMENTED; @@ -452,6 +524,240 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, return status; } +namespace +{ +const std::string TYPE_QUEUE("queue"); +const std::string TYPE_EXCHANGE("exchange"); +const std::string TYPE_TOPIC("topic"); +const std::string TYPE_BINDING("binding"); +const std::string DURABLE("durable"); +const std::string AUTO_DELETE("auto-delete"); +const std::string ALTERNATE_EXCHANGE("alternate-exchange"); +const std::string EXCHANGE_TYPE("exchange-type"); +const std::string QUEUE_NAME("queue"); +const std::string EXCHANGE_NAME("exchange"); + +const std::string ATTRIBUTE_TIMESTAMP_0_10("timestamp-0.10"); + +const std::string _TRUE("true"); +const std::string _FALSE("false"); +} + +struct InvalidBindingIdentifier : public qpid::Exception +{ + InvalidBindingIdentifier(const std::string& name) : qpid::Exception(name) {} + std::string getPrefix() const { return "invalid binding"; } +}; + +struct BindingIdentifier +{ + std::string exchange; + std::string queue; + std::string key; + + BindingIdentifier(const std::string& name) + { + std::vector<std::string> path; + split(path, name, "/"); + switch (path.size()) { + case 1: + queue = path[0]; + break; + case 2: + exchange = path[0]; + queue = path[1]; + break; + case 3: + exchange = path[0]; + queue = path[1]; + key = path[2]; + break; + default: + throw InvalidBindingIdentifier(name); + } + } +}; + +struct ObjectAlreadyExists : public qpid::Exception +{ + ObjectAlreadyExists(const std::string& name) : qpid::Exception(name) {} + std::string getPrefix() const { return "object already exists"; } +}; + +struct UnknownObjectType : public qpid::Exception +{ + UnknownObjectType(const std::string& type) : qpid::Exception(type) {} + std::string getPrefix() const { return "unknown object type"; } +}; + +void Broker::createObject(const std::string& type, const std::string& name, + const Variant::Map& properties, bool /*strict*/, const ConnectionState* context) +{ + std::string userId; + std::string connectionId; + if (context) { + userId = context->getUserId(); + connectionId = context->getUrl(); + } + //TODO: implement 'strict' option (check there are no unrecognised properties) + QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ")"); + if (type == TYPE_QUEUE) { + bool durable(false); + bool autodelete(false); + std::string alternateExchange; + Variant::Map extensions; + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + // extract durable, auto-delete and alternate-exchange properties + if (i->first == DURABLE) durable = i->second; + else if (i->first == AUTO_DELETE) autodelete = i->second; + else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString(); + //treat everything else as extension properties + else extensions[i->first] = i->second; + } + framing::FieldTable arguments; + amqp_0_10::translate(extensions, arguments); + + std::pair<boost::shared_ptr<Queue>, bool> result = + createQueue(name, durable, autodelete, 0, alternateExchange, arguments, userId, connectionId); + if (!result.second) { + throw ObjectAlreadyExists(name); + } + } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) { + bool durable(false); + std::string exchangeType("topic"); + std::string alternateExchange; + Variant::Map extensions; + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + // extract durable, auto-delete and alternate-exchange properties + if (i->first == DURABLE) durable = i->second; + else if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString(); + else if (i->first == ALTERNATE_EXCHANGE) alternateExchange = i->second.asString(); + //treat everything else as extension properties + else extensions[i->first] = i->second; + } + framing::FieldTable arguments; + amqp_0_10::translate(extensions, arguments); + + try { + std::pair<boost::shared_ptr<Exchange>, bool> result = + createExchange(name, exchangeType, durable, alternateExchange, arguments, userId, connectionId); + if (!result.second) { + throw ObjectAlreadyExists(name); + } + } catch (const UnknownExchangeTypeException&) { + throw Exception(QPID_MSG("Invalid exchange type: " << exchangeType)); + } + } else if (type == TYPE_BINDING) { + BindingIdentifier binding(name); + std::string exchangeType("topic"); + Variant::Map extensions; + for (Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) { + // extract durable, auto-delete and alternate-exchange properties + if (i->first == EXCHANGE_TYPE) exchangeType = i->second.asString(); + //treat everything else as extension properties + else extensions[i->first] = i->second; + } + framing::FieldTable arguments; + amqp_0_10::translate(extensions, arguments); + + bind(binding.queue, binding.exchange, binding.key, arguments, userId, connectionId); + } else { + throw UnknownObjectType(type); + } +} + +void Broker::deleteObject(const std::string& type, const std::string& name, + const Variant::Map& options, const ConnectionState* context) +{ + std::string userId; + std::string connectionId; + if (context) { + userId = context->getUserId(); + connectionId = context->getUrl(); + } + QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ")"); + if (type == TYPE_QUEUE) { + deleteQueue(name, userId, connectionId); + } else if (type == TYPE_EXCHANGE || type == TYPE_TOPIC) { + deleteExchange(name, userId, connectionId); + } else if (type == TYPE_BINDING) { + BindingIdentifier binding(name); + unbind(binding.queue, binding.exchange, binding.key, userId, connectionId); + } else { + throw UnknownObjectType(type); + } + +} + +Manageable::status_t Broker::queryObject(const std::string& type, + const std::string& name, + Variant::Map& results, + const ConnectionState* context) +{ + std::string userId; + std::string connectionId; + if (context) { + userId = context->getUserId(); + connectionId = context->getUrl(); + } + QPID_LOG (debug, "Broker::query(" << type << ", " << name << ")"); + + if (type == TYPE_QUEUE) + return queryQueue( name, userId, connectionId, results ); + + if (type == TYPE_EXCHANGE || + type == TYPE_TOPIC || + type == TYPE_BINDING) + return Manageable::STATUS_NOT_IMPLEMENTED; + + throw UnknownObjectType(type); +} + +Manageable::status_t Broker::queryQueue( const std::string& name, + const std::string& userId, + const std::string& /*connectionId*/, + Variant::Map& results ) +{ + (void) results; + if (acl) { + if (!acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_QUEUE, name, NULL) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue query request from " << userId)); + } + + boost::shared_ptr<Queue> q(queues.find(name)); + if (!q) { + QPID_LOG(error, "Query failed: queue not found, name=" << name); + return Manageable::STATUS_UNKNOWN_OBJECT; + } + q->query( results ); + return Manageable::STATUS_OK;; +} + +Manageable::status_t Broker::getTimestampConfig(bool& receive, + const ConnectionState* context) +{ + std::string name; // none needed for broker + std::string userId = context->getUserId(); + if (acl && !acl->authorise(userId, acl::ACT_ACCESS, acl::OBJ_BROKER, name, NULL)) { + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp get request from " << userId)); + } + receive = config.timestampRcvMsgs; + return Manageable::STATUS_OK; +} + +Manageable::status_t Broker::setTimestampConfig(const bool receive, + const ConnectionState* context) +{ + std::string name; // none needed for broker + std::string userId = context->getUserId(); + if (acl && !acl->authorise(userId, acl::ACT_UPDATE, acl::OBJ_BROKER, name, NULL)) { + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied broker timestamp set request from " << userId)); + } + config.timestampRcvMsgs = receive; + QPID_LOG(notice, "Receive message timestamping is " << ((config.timestampRcvMsgs) ? "ENABLED." : "DISABLED.")); + return Manageable::STATUS_OK; +} + void Broker::setLogLevel(const std::string& level) { QPID_LOG(notice, "Changing log level to " << level); @@ -466,7 +772,7 @@ std::string Broker::getLogLevel() const std::vector<std::string>& selectors = qpid::log::Logger::instance().getOptions().selectors; for (std::vector<std::string>::const_iterator i = selectors.begin(); i != selectors.end(); ++i) { if (i != selectors.begin()) level += std::string(","); - level += *i; + level += *i; } return level; } @@ -499,7 +805,7 @@ void Broker::accept() { } void Broker::connect( - const std::string& host, uint16_t port, const std::string& transport, + const std::string& host, const std::string& port, const std::string& transport, boost::function2<void, int, std::string> failed, sys::ConnectionCodec::Factory* f) { @@ -515,13 +821,14 @@ void Broker::connect( { url.throwIfEmpty(); const Address& addr=url[0]; - connect(addr.host, addr.port, addr.protocol, failed, f); + connect(addr.host, boost::lexical_cast<std::string>(addr.port), addr.protocol, failed, f); } uint32_t Broker::queueMoveMessages( const std::string& srcQueue, const std::string& destQueue, - uint32_t qty) + uint32_t qty, + const Variant::Map& filter) { Queue::shared_ptr src_queue = queues.find(srcQueue); if (!src_queue) @@ -530,7 +837,7 @@ uint32_t Broker::queueMoveMessages( if (!dest_queue) return 0; - return src_queue->move(dest_queue, qty); + return src_queue->move(dest_queue, qty, &filter); } @@ -548,9 +855,228 @@ bool Broker::deferDeliveryImpl(const std::string& , void Broker::setClusterTimer(std::auto_ptr<sys::Timer> t) { clusterTimer = t; + queueCleaner.setTimer(clusterTimer.get()); + dtxManager.setTimer(*clusterTimer.get()); } const std::string Broker::TCP_TRANSPORT("tcp"); + +std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( + const std::string& name, + bool durable, + bool autodelete, + const OwnershipToken* owner, + const std::string& alternateExchange, + const qpid::framing::FieldTable& arguments, + const std::string& userId, + const std::string& connectionId) +{ + if (acl) { + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); + params.insert(make_pair(acl::PROP_PASSIVE, _FALSE)); + params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE)); + params.insert(make_pair(acl::PROP_EXCLUSIVE, owner ? _TRUE : _FALSE)); + params.insert(make_pair(acl::PROP_AUTODELETE, autodelete ? _TRUE : _FALSE)); + params.insert(make_pair(acl::PROP_POLICYTYPE, arguments.getAsString("qpid.policy_type"))); + params.insert(make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<string>(arguments.getAsInt("qpid.max_count")))); + params.insert(make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<string>(arguments.getAsInt64("qpid.max_size")))); + + if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_QUEUE,name,¶ms) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue create request from " << userId)); + } + + Exchange::shared_ptr alternate; + if (!alternateExchange.empty()) { + alternate = exchanges.get(alternateExchange); + if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange)); + } + + std::pair<Queue::shared_ptr, bool> result = queues.declare(name, durable, autodelete, owner, alternate, arguments); + if (result.second) { + //add default binding: + result.first->bind(exchanges.getDefault(), name); + + if (managementAgent.get()) { + //TODO: debatable whether we should raise an event here for + //create when this is a 'declare' event; ideally add a create + //event instead? + managementAgent->raiseEvent( + _qmf::EventQueueDeclare(connectionId, userId, name, + durable, owner, autodelete, + ManagementAgent::toMap(arguments), + "created")); + } + } + return result; +} + +void Broker::deleteQueue(const std::string& name, const std::string& userId, + const std::string& connectionId, QueueFunctor check) +{ + if (acl && !acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_QUEUE,name,NULL)) { + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied queue delete request from " << userId)); + } + + Queue::shared_ptr queue = queues.find(name); + if (queue) { + if (check) check(queue); + queues.destroy(name); + queue->destroyed(); + } else { + throw framing::NotFoundException(QPID_MSG("Delete failed. No such queue: " << name)); + } + + if (managementAgent.get()) + managementAgent->raiseEvent(_qmf::EventQueueDelete(connectionId, userId, name)); + +} + +std::pair<Exchange::shared_ptr, bool> Broker::createExchange( + const std::string& name, + const std::string& type, + bool durable, + const std::string& alternateExchange, + const qpid::framing::FieldTable& arguments, + const std::string& userId, + const std::string& connectionId) +{ + if (acl) { + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_TYPE, type)); + params.insert(make_pair(acl::PROP_ALTERNATE, alternateExchange)); + params.insert(make_pair(acl::PROP_PASSIVE, _FALSE)); + params.insert(make_pair(acl::PROP_DURABLE, durable ? _TRUE : _FALSE)); + if (!acl->authorise(userId,acl::ACT_CREATE,acl::OBJ_EXCHANGE,name,¶ms) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange create request from " << userId)); + } + + Exchange::shared_ptr alternate; + if (!alternateExchange.empty()) { + alternate = exchanges.get(alternateExchange); + if (!alternate) throw framing::NotFoundException(QPID_MSG("Alternate exchange does not exist: " << alternateExchange)); + } + + std::pair<Exchange::shared_ptr, bool> result; + result = exchanges.declare(name, type, durable, arguments); + if (result.second) { + if (alternate) { + result.first->setAlternate(alternate); + alternate->incAlternateUsers(); + } + if (durable) { + store->create(*result.first, arguments); + } + if (managementAgent.get()) { + //TODO: debatable whether we should raise an event here for + //create when this is a 'declare' event; ideally add a create + //event instead? + managementAgent->raiseEvent(_qmf::EventExchangeDeclare(connectionId, + userId, + name, + type, + alternateExchange, + durable, + false, + ManagementAgent::toMap(arguments), + "created")); + } + } + return result; +} + +void Broker::deleteExchange(const std::string& name, const std::string& userId, + const std::string& connectionId) +{ + if (acl) { + if (!acl->authorise(userId,acl::ACT_DELETE,acl::OBJ_EXCHANGE,name,NULL) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange delete request from " << userId)); + } + + if (name.empty()) { + throw framing::InvalidArgumentException(QPID_MSG("Delete not allowed for default exchange")); + } + Exchange::shared_ptr exchange(exchanges.get(name)); + if (!exchange) throw framing::NotFoundException(QPID_MSG("Delete failed. No such exchange: " << name)); + if (exchange->inUseAsAlternate()) throw framing::NotAllowedException(QPID_MSG("Exchange in use as alternate-exchange.")); + if (exchange->isDurable()) store->destroy(*exchange); + if (exchange->getAlternate()) exchange->getAlternate()->decAlternateUsers(); + exchanges.destroy(name); + + if (managementAgent.get()) + managementAgent->raiseEvent(_qmf::EventExchangeDelete(connectionId, userId, name)); + +} + +void Broker::bind(const std::string& queueName, + const std::string& exchangeName, + const std::string& key, + const qpid::framing::FieldTable& arguments, + const std::string& userId, + const std::string& connectionId) +{ + if (acl) { + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); + params.insert(make_pair(acl::PROP_ROUTINGKEY, key)); + + if (!acl->authorise(userId,acl::ACT_BIND,acl::OBJ_EXCHANGE,exchangeName,¶ms)) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange bind request from " << userId)); + } + if (exchangeName.empty()) { + throw framing::InvalidArgumentException(QPID_MSG("Bind not allowed for default exchange")); + } + + Queue::shared_ptr queue = queues.find(queueName); + Exchange::shared_ptr exchange = exchanges.get(exchangeName); + if (!queue) { + throw framing::NotFoundException(QPID_MSG("Bind failed. No such queue: " << queueName)); + } else if (!exchange) { + throw framing::NotFoundException(QPID_MSG("Bind failed. No such exchange: " << exchangeName)); + } else { + if (queue->bind(exchange, key, arguments)) { + if (managementAgent.get()) { + managementAgent->raiseEvent(_qmf::EventBind(connectionId, userId, exchangeName, + queueName, key, ManagementAgent::toMap(arguments))); + } + } + } +} + +void Broker::unbind(const std::string& queueName, + const std::string& exchangeName, + const std::string& key, + const std::string& userId, + const std::string& connectionId) +{ + if (acl) { + std::map<acl::Property, std::string> params; + params.insert(make_pair(acl::PROP_QUEUENAME, queueName)); + params.insert(make_pair(acl::PROP_ROUTINGKEY, key)); + if (!acl->authorise(userId,acl::ACT_UNBIND,acl::OBJ_EXCHANGE,exchangeName,¶ms) ) + throw framing::UnauthorizedAccessException(QPID_MSG("ACL denied exchange unbind request from " << userId)); + } + if (exchangeName.empty()) { + throw framing::InvalidArgumentException(QPID_MSG("Unbind not allowed for default exchange")); + } + Queue::shared_ptr queue = queues.find(queueName); + Exchange::shared_ptr exchange = exchanges.get(exchangeName); + if (!queue) { + throw framing::NotFoundException(QPID_MSG("Unbind failed. No such queue: " << queueName)); + } else if (!exchange) { + throw framing::NotFoundException(QPID_MSG("Unbind failed. No such exchange: " << exchangeName)); + } else { + if (exchange->unbind(queue, key, 0)) { + if (exchange->isDurable() && queue->isDurable()) { + store->unbind(*exchange, *queue, key, qpid::framing::FieldTable()); + } + if (managementAgent.get()) { + managementAgent->raiseEvent(_qmf::EventUnbind(connectionId, userId, exchangeName, queueName, key)); + } + } + } +} + }} // namespace qpid::broker |