diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/broker/SessionAdapter.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SessionAdapter.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 28 |
1 files changed, 12 insertions, 16 deletions
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index cb2fe15b58..1ea18ea472 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -16,7 +16,10 @@ * */ #include "qpid/broker/SessionAdapter.h" + +#include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" +#include "qpid/broker/DtxTimeout.h" #include "qpid/broker/Queue.h" #include "qpid/Exception.h" #include "qpid/framing/reply_exceptions.h" @@ -98,17 +101,6 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const //exchange already there, not created checkType(response.first, type); checkAlternate(response.first, alternate); - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventExchangeDeclare(getConnection().getUrl(), - getConnection().getUserId(), - exchange, - type, - alternateExchange, - durable, - false, - ManagementAgent::toMap(args), - "existing")); QPID_LOG_CAT(debug, model, "Create exchange. name:" << exchange << " user:" << getConnection().getUserId() << " rhost:" << getConnection().getUrl() @@ -165,12 +157,14 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, { getBroker().bind(queueName, exchangeName, routingKey, arguments, getConnection().getUserId(), getConnection().getUrl()); + state.addBinding(queueName, exchangeName, routingKey, arguments); } void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, const string& exchangeName, const string& routingKey) { + state.removeBinding(queueName, exchangeName, routingKey); getBroker().unbind(queueName, exchangeName, routingKey, getConnection().getUserId(), getConnection().getUrl()); } @@ -300,6 +294,8 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& } catch (const qpid::types::Exception& e) { throw InvalidArgumentException(e.what()); } + // Identify queues that won't survive a failover. + settings.isTemporary = exclusive && autoDelete && !settings.autoDeleteDelay; std::pair<Queue::shared_ptr, bool> queue_created = getBroker().createQueue(name, settings, @@ -318,11 +314,6 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& if (exclusive && queue->setExclusiveOwner(&session)) { exclusiveQueues.push_back(queue); } - ManagementAgent* agent = getBroker().getManagementAgent(); - if (agent) - agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(), - name, durable, exclusive, autoDelete, alternateExchange, ManagementAgent::toMap(arguments), - "existing")); QPID_LOG_CAT(debug, model, "Create queue. name:" << name << " user:" << getConnection().getUserId() << " rhost:" << getConnection().getUrl() @@ -422,6 +413,11 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, if(!destination.empty() && state.exists(destination)) throw NotAllowedException(QPID_MSG("Consumer tags must be unique")); + if (queue->getSettings().isBrowseOnly && acquireMode == 0) { + QPID_LOG(info, "Overriding request to consume from browse-only queue " << queue->getName()); + acquireMode = 1; + } + // We allow browsing (acquireMode == 1) of exclusive queues, this is required by HA. if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session) && acquireMode == 0) throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive queue " |