diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/broker/SemanticState.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 167 |
1 files changed, 113 insertions, 54 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 5fc9a1a932..751b3ff709 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -20,6 +20,8 @@ */ #include "qpid/broker/SessionState.h" + +#include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/DtxAck.h" @@ -35,12 +37,14 @@ #include "qpid/framing/SequenceSet.h" #include "qpid/framing/IsInSequenceSet.h" #include "qpid/log/Statement.h" -#include "qpid/sys/ClusterSafe.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/ptr_map.h" #include "qpid/broker/AclModule.h" +#include "qpid/broker/FedOps.h" #include <boost/bind.hpp> #include <boost/format.hpp> +#include <boost/tuple/tuple_comparison.hpp> #include <iostream> #include <sstream> @@ -49,6 +53,11 @@ #include <assert.h> +namespace { +const std::string X_SCOPE("x-scope"); +const std::string SESSION("session"); +} + namespace qpid { namespace broker { @@ -88,6 +97,7 @@ void SemanticState::closed() { if (dtxBuffer.get()) { dtxBuffer->fail(); } + unbindSessionBindings(); requeue(); //now unsubscribe, which may trigger queue deletion and thus @@ -277,7 +287,7 @@ void SemanticState::record(const DeliveryRecord& delivery) const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); -SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, +SemanticStateConsumerImpl::SemanticStateConsumerImpl(SemanticState* _parent, const string& _name, Queue::shared_ptr _queue, bool ack, @@ -303,7 +313,7 @@ Consumer(_name, type), notifyEnabled(true), syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), deliveryCount(0), - mgmtObject(0) + protocols(parent->getSession().getBroker().getProtocolRegistry()) { if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0) { @@ -312,20 +322,20 @@ Consumer(_name, type), if (agent != 0) { - mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(), - !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)); + mgmtObject = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(), + !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments))); agent->addObject (mgmtObject); mgmtObject->set_creditMode("WINDOW"); } } } -ManagementObject* SemanticState::ConsumerImpl::GetManagementObject (void) const +ManagementObject::shared_ptr SemanticStateConsumerImpl::GetManagementObject (void) const { - return (ManagementObject*) mgmtObject; + return mgmtObject; } -Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&) +Manageable::status_t SemanticStateConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; @@ -335,24 +345,23 @@ Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t met } -OwnershipToken* SemanticState::ConsumerImpl::getSession() +OwnershipToken* SemanticStateConsumerImpl::getSession() { return &(parent->session); } -bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg) +bool SemanticStateConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg) { return deliver(cursor, msg, shared_from_this()); } -bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer) +bool SemanticStateConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer) { - assertClusterSafe(); allocateCredit(msg); + boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg); DeliveryRecord record(cursor, msg.getSequence(), queue, getTag(), - consumer, acquire, !ackExpected, credit.isWindowMode(), amqp_0_10::MessageTransfer::getRequiredCredit(msg)); + consumer, acquire, !ackExpected, credit.isWindowMode(), transfer->getRequiredCredit()); bool sync = syncFrequency && ++deliveryCount >= syncFrequency; if (sync) deliveryCount = 0;//reset - const amqp_0_10::MessageTransfer* transfer = dynamic_cast<const amqp_0_10::MessageTransfer*>(&msg.getEncoding()); record.setId(parent->session.deliver(*transfer, getTag(), msg.isRedelivered(), msg.getTtl(), msg.getTimestamp(), ackExpected ? message::ACCEPT_MODE_EXPLICIT : message::ACCEPT_MODE_NONE, @@ -370,27 +379,26 @@ bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Messa return true; } -bool SemanticState::ConsumerImpl::filter(const Message&) +bool SemanticStateConsumerImpl::filter(const Message&) { return true; } -bool SemanticState::ConsumerImpl::accept(const Message& msg) +bool SemanticStateConsumerImpl::accept(const Message& msg) { - assertClusterSafe(); // TODO aconway 2009-06-08: if we have byte & message credit but // checkCredit fails because the message is to big, we should // remain on queue's listener list for possible smaller messages // in future. // - blocked = !(filter(msg) && checkCredit(msg)); + blocked = !checkCredit(msg); return !blocked; } namespace { struct ConsumerName { - const SemanticState::ConsumerImpl& consumer; - ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {} + const SemanticStateConsumerImpl& consumer; + ConsumerName(const SemanticStateConsumerImpl& ci) : consumer(ci) {} }; ostream& operator<<(ostream& o, const ConsumerName& pc) { @@ -399,26 +407,27 @@ ostream& operator<<(ostream& o, const ConsumerName& pc) { } } -void SemanticState::ConsumerImpl::allocateCredit(const Message& msg) +void SemanticStateConsumerImpl::allocateCredit(const Message& msg) { - assertClusterSafe(); Credit original = credit; - credit.consume(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg)); + boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg); + credit.consume(1, transfer->getRequiredCredit()); QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this) << ", was " << original << " now " << credit); } -bool SemanticState::ConsumerImpl::checkCredit(const Message& msg) +bool SemanticStateConsumerImpl::checkCredit(const Message& msg) { - bool enoughCredit = credit.check(1, qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg)); + boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg); + bool enoughCredit = credit.check(1, transfer->getRequiredCredit()); QPID_LOG(debug, "Subscription " << ConsumerName(*this) << " has " << (enoughCredit ? "sufficient " : "insufficient") - << " credit for message of " << qpid::broker::amqp_0_10::MessageTransfer::getRequiredCredit(msg) << " bytes: " + << " credit for message of " << transfer->getRequiredCredit() << " bytes: " << credit); return enoughCredit; } -SemanticState::ConsumerImpl::~ConsumerImpl() +SemanticStateConsumerImpl::~SemanticStateConsumerImpl() { if (mgmtObject != 0) mgmtObject->resourceDestroy (); @@ -437,9 +446,9 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c) Queue::shared_ptr queue = c->getQueue(); if(queue) { queue->cancel(c); - if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { + // Only run auto-delete for counted consumers. + if (c->isCounted() && queue->canAutoDelete() && !queue->hasExclusiveOwner()) Queue::tryAutoDelete(session.getBroker(), queue, connectionId, userID); - } } c->cancel(); } @@ -491,9 +500,8 @@ void SemanticState::requestDispatch() i->second->requestDispatch(); } -void SemanticState::ConsumerImpl::requestDispatch() +void SemanticStateConsumerImpl::requestDispatch() { - assertClusterSafe(); if (blocked) { parent->session.getConnection().outputTasks.addOutputTask(this); parent->session.getConnection().outputTasks.activateOutput(); @@ -510,7 +518,7 @@ bool SemanticState::complete(DeliveryRecord& delivery) return delivery.isRedundant(); } -void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery) +void SemanticStateConsumerImpl::complete(DeliveryRecord& delivery) { if (!delivery.isComplete()) { delivery.complete(); @@ -535,7 +543,7 @@ SessionContext& SemanticState::getSession() { return session; } const SessionContext& SemanticState::getSession() const { return session; } -const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const +const SemanticStateConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const { ConsumerImpl::shared_ptr consumer; if (!find(destination, consumer)) { @@ -592,37 +600,33 @@ void SemanticState::stop(const std::string& destination) find(destination)->stop(); } -void SemanticState::ConsumerImpl::setWindowMode() +void SemanticStateConsumerImpl::setWindowMode() { - assertClusterSafe(); credit.setWindowMode(true); if (mgmtObject){ mgmtObject->set_creditMode("WINDOW"); } } -void SemanticState::ConsumerImpl::setCreditMode() +void SemanticStateConsumerImpl::setCreditMode() { - assertClusterSafe(); credit.setWindowMode(false); if (mgmtObject){ mgmtObject->set_creditMode("CREDIT"); } } -void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) +void SemanticStateConsumerImpl::addByteCredit(uint32_t value) { - assertClusterSafe(); credit.addByteCredit(value); } -void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) +void SemanticStateConsumerImpl::addMessageCredit(uint32_t value) { - assertClusterSafe(); credit.addMessageCredit(value); } -bool SemanticState::ConsumerImpl::haveCredit() +bool SemanticStateConsumerImpl::haveCredit() { if (credit) { return true; @@ -632,21 +636,20 @@ bool SemanticState::ConsumerImpl::haveCredit() } } -bool SemanticState::ConsumerImpl::doDispatch() +bool SemanticStateConsumerImpl::doDispatch() { return queue->dispatch(shared_from_this()); } -void SemanticState::ConsumerImpl::flush() +void SemanticStateConsumerImpl::flush() { while(haveCredit() && doDispatch()) ; credit.cancel(); } -void SemanticState::ConsumerImpl::stop() +void SemanticStateConsumerImpl::stop() { - assertClusterSafe(); credit.cancel(); } @@ -700,7 +703,7 @@ void SemanticState::reject(DeliveryId first, DeliveryId last) getSession().setUnackedCount(unacked.size()); } -bool SemanticState::ConsumerImpl::doOutput() +bool SemanticStateConsumerImpl::doOutput() { try { return haveCredit() && doDispatch(); @@ -709,28 +712,26 @@ bool SemanticState::ConsumerImpl::doOutput() } } -void SemanticState::ConsumerImpl::enableNotify() +void SemanticStateConsumerImpl::enableNotify() { Mutex::ScopedLock l(lock); - assertClusterSafe(); notifyEnabled = true; } -void SemanticState::ConsumerImpl::disableNotify() +void SemanticStateConsumerImpl::disableNotify() { Mutex::ScopedLock l(lock); notifyEnabled = false; } -bool SemanticState::ConsumerImpl::isNotifyEnabled() const { +bool SemanticStateConsumerImpl::isNotifyEnabled() const { Mutex::ScopedLock l(lock); return notifyEnabled; } -void SemanticState::ConsumerImpl::notify() +void SemanticStateConsumerImpl::notify() { Mutex::ScopedLock l(lock); - assertClusterSafe(); if (notifyEnabled) { parent->session.getConnection().outputTasks.addOutputTask(this); parent->session.getConnection().outputTasks.activateOutput(); @@ -755,7 +756,6 @@ isInSequenceSetAnd(const SequenceSet& s, Predicate p) { } void SemanticState::accepted(const SequenceSet& commands) { - assertClusterSafe(); if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just //maintain set of acknowledged messages: @@ -815,4 +815,63 @@ void SemanticState::detached() } } +void SemanticState::addBinding(const string& queueName, const string& exchangeName, + const string& routingKey, const framing::FieldTable& arguments) +{ + QPID_LOG (debug, "SemanticState::addBinding [" + << "queue=" << queueName << ", " + << "exchange=" << exchangeName << ", " + << "key=" << routingKey << ", " + << "args=" << arguments << "]"); + std::string fedOp = arguments.getAsString(qpidFedOp); + if ((arguments.isSet(qpidFedOp)) && (fedOp.empty())) { + fedOp = fedOpBind; + } + std::string fedOrigin = arguments.getAsString(qpidFedOrigin); + if ((arguments.getAsString(X_SCOPE) == SESSION) || (fedOp == fedOpBind)) { + bindings.insert(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin)); + } + else if (fedOp == fedOpUnbind) { + bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, fedOrigin)); + } +} + +void SemanticState::removeBinding(const string& queueName, const string& exchangeName, + const string& routingKey) +{ + QPID_LOG (debug, "SemanticState::removeBinding [" + << "queue=" << queueName << ", " + << "exchange=" << exchangeName << ", " + << "key=" << routingKey) + bindings.erase(boost::make_tuple(queueName, exchangeName, routingKey, "")); +} + +void SemanticState::unbindSessionBindings() +{ + //unbind session-scoped bindings + for (Bindings::iterator i = bindings.begin(); i != bindings.end(); i++) { + QPID_LOG (debug, "SemanticState::unbindSessionBindings [" + << "queue=" << i->get<0>() << ", " + << "exchange=" << i->get<1>()<< ", " + << "key=" << i->get<2>() << ", " + << "fedOrigin=" << i->get<3>() << "]"); + try { + std::string fedOrigin = i->get<3>(); + if (!fedOrigin.empty()) { + framing::FieldTable fedArguments; + fedArguments.setString(qpidFedOp, fedOpUnbind); + fedArguments.setString(qpidFedOrigin, fedOrigin); + session.getBroker().bind(i->get<0>(), i->get<1>(), i->get<2>(), fedArguments, + userID, connectionId); + } else { + session.getBroker().unbind(i->get<0>(), i->get<1>(), i->get<2>(), + userID, connectionId); + } + } + catch (...) { + } + } + bindings.clear(); +} + }} // namespace qpid::broker |