diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /cpp/src/qpid/broker/SemanticState.cpp | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 823 |
1 files changed, 0 insertions, 823 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp deleted file mode 100644 index ce86253f4a..0000000000 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ /dev/null @@ -1,823 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/broker/SessionState.h" -#include "qpid/broker/Connection.h" -#include "qpid/broker/DeliverableMessage.h" -#include "qpid/broker/DtxAck.h" -#include "qpid/broker/DtxTimeout.h" -#include "qpid/broker/Message.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/SessionContext.h" -#include "qpid/broker/SessionOutputException.h" -#include "qpid/broker/TxAccept.h" -#include "qpid/broker/TxPublish.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/SequenceSet.h" -#include "qpid/framing/IsInSequenceSet.h" -#include "qpid/log/Statement.h" -#include "qpid/sys/ClusterSafe.h" -#include "qpid/ptr_map.h" -#include "qpid/broker/AclModule.h" - -#include <boost/bind.hpp> -#include <boost/format.hpp> - -#include <iostream> -#include <sstream> -#include <algorithm> -#include <functional> - -#include <assert.h> - -namespace qpid { -namespace broker { - -using namespace std; -using boost::intrusive_ptr; -using boost::bind; -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; -using qpid::ptr_map_ptr; -using qpid::management::ManagementAgent; -using qpid::management::ManagementObject; -using qpid::management::Manageable; -using qpid::management::Args; -namespace _qmf = qmf::org::apache::qpid::broker; - -SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) - : session(ss), - deliveryAdapter(da), - tagGenerator("sgen"), - dtxSelected(false), - authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()), - userID(getSession().getConnection().getUserId()), - userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))), - isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())), - closeComplete(false) -{ - acl = getSession().getBroker().getAcl(); -} - -SemanticState::~SemanticState() { - closed(); -} - -void SemanticState::closed() { - if (!closeComplete) { - //prevent requeued messages being redelivered to consumers - for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - disable(i->second); - } - if (dtxBuffer.get()) { - dtxBuffer->fail(); - } - recover(true); - - //now unsubscribe, which may trigger queue deletion and thus - //needs to occur after the requeueing of unacked messages - for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - unsubscribe(i->second); - } - closeComplete = true; - } -} - -bool SemanticState::exists(const string& consumerTag){ - return consumers.find(consumerTag) != consumers.end(); -} - -void SemanticState::consume(const string& tag, - Queue::shared_ptr queue, bool ackRequired, bool acquire, - bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) -{ - ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments)); - queue->consume(c, exclusive);//may throw exception - consumers[tag] = c; -} - -bool SemanticState::cancel(const string& tag) -{ - ConsumerImplMap::iterator i = consumers.find(tag); - if (i != consumers.end()) { - cancel(i->second); - consumers.erase(i); - //should cancel all unacked messages for this consumer so that - //they are not redelivered on recovery - for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag)); - return true; - } else { - return false; - } -} - - -void SemanticState::startTx() -{ - txBuffer = TxBuffer::shared_ptr(new TxBuffer()); -} - -void SemanticState::commit(MessageStore* const store) -{ - if (!txBuffer) throw - CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); - - TxOp::shared_ptr txAck(static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked))); - txBuffer->enlist(txAck); - if (txBuffer->commitLocal(store)) { - accumulatedAck.clear(); - } else { - throw InternalErrorException(QPID_MSG("Commit failed")); - } -} - -void SemanticState::rollback() -{ - if (!txBuffer) - throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); - - txBuffer->rollback(); - accumulatedAck.clear(); -} - -void SemanticState::selectDtx() -{ - dtxSelected = true; -} - -void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join) -{ - if (!dtxSelected) { - throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx")); - } - dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid)); - txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer); - if (join) { - mgr.join(xid, dtxBuffer); - } else { - mgr.start(xid, dtxBuffer); - } -} - -void SemanticState::endDtx(const std::string& xid, bool fail) -{ - if (!dtxBuffer) { - throw IllegalStateException(QPID_MSG("xid " << xid << " not associated with this session")); - } - if (dtxBuffer->getXid() != xid) { - throw CommandInvalidException( - QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on end")); - - } - - txBuffer.reset();//ops on this session no longer transactional - - checkDtxTimeout(); - if (fail) { - dtxBuffer->fail(); - } else { - dtxBuffer->markEnded(); - } - dtxBuffer.reset(); -} - -void SemanticState::suspendDtx(const std::string& xid) -{ - if (dtxBuffer->getXid() != xid) { - throw CommandInvalidException( - QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on suspend")); - } - txBuffer.reset();//ops on this session no longer transactional - - checkDtxTimeout(); - dtxBuffer->setSuspended(true); - suspendedXids[xid] = dtxBuffer; - dtxBuffer.reset(); -} - -void SemanticState::resumeDtx(const std::string& xid) -{ - if (!dtxSelected) { - throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx")); - } - - dtxBuffer = suspendedXids[xid]; - if (!dtxBuffer) { - throw CommandInvalidException(QPID_MSG("xid " << xid << " not attached")); - } else { - suspendedXids.erase(xid); - } - - if (dtxBuffer->getXid() != xid) { - throw CommandInvalidException( - QPID_MSG("xid specified on start was " << dtxBuffer->getXid() << ", but " << xid << " specified on resume")); - - } - if (!dtxBuffer->isSuspended()) { - throw CommandInvalidException(QPID_MSG("xid " << xid << " not suspended")); - } - - checkDtxTimeout(); - dtxBuffer->setSuspended(false); - txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer); -} - -void SemanticState::checkDtxTimeout() -{ - if (dtxBuffer->isExpired()) { - dtxBuffer.reset(); - throw DtxTimeoutException(); - } -} - -void SemanticState::record(const DeliveryRecord& delivery) -{ - unacked.push_back(delivery); -} - -const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); - -SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, - const string& _name, - Queue::shared_ptr _queue, - bool ack, - bool _acquire, - bool _exclusive, - const string& _resumeId, - uint64_t _resumeTtl, - const framing::FieldTable& _arguments - - -) : - Consumer(_acquire), - parent(_parent), - name(_name), - queue(_queue), - ackExpected(ack), - acquire(_acquire), - blocked(true), - windowing(true), - exclusive(_exclusive), - resumeId(_resumeId), - resumeTtl(_resumeTtl), - arguments(_arguments), - msgCredit(0), - byteCredit(0), - notifyEnabled(true), - syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), - deliveryCount(0), - mgmtObject(0) -{ - if (parent != 0 && queue.get() != 0 && queue->GetManagementObject() !=0) - { - ManagementAgent* agent = parent->session.getBroker().getManagementAgent(); - qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session)); - - if (agent != 0) - { - mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name, - !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)); - agent->addObject (mgmtObject); - mgmtObject->set_creditMode("WINDOW"); - } - } -} - -ManagementObject* SemanticState::ConsumerImpl::GetManagementObject (void) const -{ - return (ManagementObject*) mgmtObject; -} - -Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&) -{ - Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; - - QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]"); - - return status; -} - - -OwnershipToken* SemanticState::ConsumerImpl::getSession() -{ - return &(parent->session); -} - -bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) -{ - assertClusterSafe(); - allocateCredit(msg.payload); - DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing); - bool sync = syncFrequency && ++deliveryCount >= syncFrequency; - if (sync) deliveryCount = 0;//reset - parent->deliver(record, sync); - if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered - if (windowing || ackExpected || !acquire) { - parent->record(record); - } - if (acquire && !ackExpected) { - queue->dequeue(0, msg); - } - if (mgmtObject) { mgmtObject->inc_delivered(); } - return true; -} - -bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message>) -{ - return true; -} - -bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) -{ - assertClusterSafe(); - // FIXME aconway 2009-06-08: if we have byte & message credit but - // checkCredit fails because the message is to big, we should - // remain on queue's listener list for possible smaller messages - // in future. - // - blocked = !(filter(msg) && checkCredit(msg)); - return !blocked; -} - -namespace { -struct ConsumerName { - const SemanticState::ConsumerImpl& consumer; - ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {} -}; - -ostream& operator<<(ostream& o, const ConsumerName& pc) { - return o << pc.consumer.getName() << " on " - << pc.consumer.getParent().getSession().getSessionId(); -} -} - -void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) -{ - assertClusterSafe(); - uint32_t originalMsgCredit = msgCredit; - uint32_t originalByteCredit = byteCredit; - if (msgCredit != 0xFFFFFFFF) { - msgCredit--; - } - if (byteCredit != 0xFFFFFFFF) { - byteCredit -= msg->getRequiredCredit(); - } - QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this) - << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit - << " now bytes: " << byteCredit << " msgs: " << msgCredit); - -} - -bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) -{ - bool enoughCredit = msgCredit > 0 && - (byteCredit == 0xFFFFFFFF || byteCredit >= msg->getRequiredCredit()); - QPID_LOG(debug, (enoughCredit ? "Sufficient credit for " : "Insufficient credit for ") - << ConsumerName(*this) - << ", have bytes: " << byteCredit << " msgs: " << msgCredit - << ", need " << msg->getRequiredCredit() << " bytes"); - return enoughCredit; -} - -SemanticState::ConsumerImpl::~ConsumerImpl() -{ - if (mgmtObject != 0) - mgmtObject->resourceDestroy (); -} - -void SemanticState::disable(ConsumerImpl::shared_ptr c) -{ - c->disableNotify(); - if (session.isAttached()) - session.getConnection().outputTasks.removeOutputTask(c.get()); -} - -void SemanticState::unsubscribe(ConsumerImpl::shared_ptr c) -{ - Queue::shared_ptr queue = c->getQueue(); - if(queue) { - queue->cancel(c); - if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { - Queue::tryAutoDelete(session.getBroker(), queue); - } - } -} - -void SemanticState::cancel(ConsumerImpl::shared_ptr c) -{ - disable(c); - unsubscribe(c); -} - -void SemanticState::handle(intrusive_ptr<Message> msg) { - if (txBuffer.get()) { - TxPublish* deliverable(new TxPublish(msg)); - TxOp::shared_ptr op(deliverable); - route(msg, *deliverable); - txBuffer->enlist(op); - } else { - DeliverableMessage deliverable(msg); - route(msg, deliverable); - if (msg->isContentReleaseRequested()) { - // NOTE: The log messages in this section are used for flow-to-disk testing (which checks the log for the - // presence of these messages). Do not change these without also checking these tests. - if (msg->isContentReleaseBlocked()) { - QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" << - std::hex << msg->getPersistenceId() << std::dec << ": Content release blocked"); - } else { - msg->releaseContent(); - QPID_LOG(debug, "Message id=\"" << msg->getProperties<MessageProperties>()->getMessageId() << "\"; pid=0x" << - std::hex << msg->getPersistenceId() << std::dec << ": Content released"); - } - } - } -} - -namespace -{ -const std::string nullstring; -} - -void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { - msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); - - std::string exchangeName = msg->getExchangeName(); - if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) - cacheExchange = session.getBroker().getExchanges().get(exchangeName); - cacheExchange->setProperties(msg); - - /* verify the userid if specified: */ - std::string id = - msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring; - - if (authMsg && !id.empty() && !(id == userID || (isDefaultRealm && id == userName))) - { - QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id); - throw UnauthorizedAccessException(QPID_MSG("authorised user id : " << userID << " but user id in message declared as " << id)); - } - - if (acl && acl->doTransferAcl()) - { - if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg->getRoutingKey() )) - throw UnauthorizedAccessException(QPID_MSG(userID << " cannot publish to " << - exchangeName << " with routing-key " << msg->getRoutingKey())); - } - - cacheExchange->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); - - if (!strategy.delivered) { - //TODO:if discard-unroutable, just drop it - //TODO:else if accept-mode is explicit, reject it - //else route it to alternate exchange - if (cacheExchange->getAlternate()) { - cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); - } - if (!strategy.delivered) { - msg->destroy(); - } - } - -} - -void SemanticState::requestDispatch() -{ - for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) - i->second->requestDispatch(); -} - -void SemanticState::ConsumerImpl::requestDispatch() -{ - assertClusterSafe(); - if (blocked) { - parent->session.getConnection().outputTasks.addOutputTask(this); - parent->session.getConnection().outputTasks.activateOutput(); - blocked = false; - } -} - -bool SemanticState::complete(DeliveryRecord& delivery) -{ - ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); - if (i != consumers.end()) { - i->second->complete(delivery); - } - return delivery.isRedundant(); -} - -void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery) -{ - if (!delivery.isComplete()) { - delivery.complete(); - if (windowing) { - if (msgCredit != 0xFFFFFFFF) msgCredit++; - if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit(); - } - } -} - -void SemanticState::recover(bool requeue) -{ - if(requeue){ - //take copy and clear unacked as requeue may result in redelivery to this session - //which will in turn result in additions to unacked - DeliveryRecords copy = unacked; - unacked.clear(); - for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); - }else{ - for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this)); - //unconfirmed messages re redelivered and therefore have their - //id adjusted, confirmed messages are not and so the ordering - //w.r.t id is lost - sort(unacked.begin(), unacked.end()); - } -} - -void SemanticState::deliver(DeliveryRecord& msg, bool sync) -{ - return deliveryAdapter.deliver(msg, sync); -} - -SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) -{ - ConsumerImplMap::iterator i = consumers.find(destination); - if (i == consumers.end()) { - throw NotFoundException(QPID_MSG("Unknown destination " << destination)); - } else { - return *(i->second); - } -} - -void SemanticState::setWindowMode(const std::string& destination) -{ - find(destination).setWindowMode(); -} - -void SemanticState::setCreditMode(const std::string& destination) -{ - find(destination).setCreditMode(); -} - -void SemanticState::addByteCredit(const std::string& destination, uint32_t value) -{ - ConsumerImpl& c = find(destination); - c.addByteCredit(value); - c.requestDispatch(); -} - - -void SemanticState::addMessageCredit(const std::string& destination, uint32_t value) -{ - ConsumerImpl& c = find(destination); - c.addMessageCredit(value); - c.requestDispatch(); -} - -void SemanticState::flush(const std::string& destination) -{ - find(destination).flush(); -} - - -void SemanticState::stop(const std::string& destination) -{ - find(destination).stop(); -} - -void SemanticState::ConsumerImpl::setWindowMode() -{ - assertClusterSafe(); - windowing = true; - if (mgmtObject){ - mgmtObject->set_creditMode("WINDOW"); - } -} - -void SemanticState::ConsumerImpl::setCreditMode() -{ - assertClusterSafe(); - windowing = false; - if (mgmtObject){ - mgmtObject->set_creditMode("CREDIT"); - } -} - -void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) -{ - assertClusterSafe(); - if (byteCredit != 0xFFFFFFFF) { - if (value == 0xFFFFFFFF) byteCredit = value; - else byteCredit += value; - } -} - -void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) -{ - assertClusterSafe(); - if (msgCredit != 0xFFFFFFFF) { - if (value == 0xFFFFFFFF) msgCredit = value; - else msgCredit += value; - } -} - -bool SemanticState::ConsumerImpl::haveCredit() -{ - if (msgCredit && byteCredit) { - return true; - } else { - blocked = true; - return false; - } -} - -void SemanticState::ConsumerImpl::flush() -{ - while(haveCredit() && queue->dispatch(shared_from_this())) - ; - stop(); -} - -void SemanticState::ConsumerImpl::stop() -{ - assertClusterSafe(); - msgCredit = 0; - byteCredit = 0; -} - -Queue::shared_ptr SemanticState::getQueue(const string& name) const { - Queue::shared_ptr queue; - if (name.empty()) { - throw NotAllowedException(QPID_MSG("No queue name specified.")); - } else { - queue = session.getBroker().getQueues().find(name); - if (!queue) - throw NotFoundException(QPID_MSG("Queue not found: "<<name)); - } - return queue; -} - -AckRange SemanticState::findRange(DeliveryId first, DeliveryId last) -{ - return DeliveryRecord::findRange(unacked, first, last); -} - -void SemanticState::acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired) -{ - AckRange range = findRange(first, last); - for_each(range.start, range.end, AcquireFunctor(acquired)); -} - -void SemanticState::release(DeliveryId first, DeliveryId last, bool setRedelivered) -{ - AckRange range = findRange(first, last); - //release results in the message being added to the head so want - //to release in reverse order to keep the original transfer order - DeliveryRecords::reverse_iterator start(range.end); - DeliveryRecords::reverse_iterator end(range.start); - for_each(start, end, boost::bind(&DeliveryRecord::release, _1, setRedelivered)); -} - -void SemanticState::reject(DeliveryId first, DeliveryId last) -{ - AckRange range = findRange(first, last); - for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject)); - //may need to remove the delivery records as well - for (DeliveryRecords::iterator i = range.start; i != unacked.end() && i->getId() <= last; ) { - if (i->isRedundant()) i = unacked.erase(i); - else i++; - } -} - -bool SemanticState::ConsumerImpl::doOutput() -{ - try { - return haveCredit() && queue->dispatch(shared_from_this()); - } catch (const SessionException& e) { - throw SessionOutputException(e, parent->session.getChannel()); - } -} - -void SemanticState::ConsumerImpl::enableNotify() -{ - Mutex::ScopedLock l(lock); - assertClusterSafe(); - notifyEnabled = true; -} - -void SemanticState::ConsumerImpl::disableNotify() -{ - Mutex::ScopedLock l(lock); - notifyEnabled = false; -} - -bool SemanticState::ConsumerImpl::isNotifyEnabled() const { - Mutex::ScopedLock l(lock); - return notifyEnabled; -} - -void SemanticState::ConsumerImpl::notify() -{ - Mutex::ScopedLock l(lock); - assertClusterSafe(); - if (notifyEnabled) { - parent->session.getConnection().outputTasks.addOutputTask(this); - parent->session.getConnection().outputTasks.activateOutput(); - } -} - - -// Test that a DeliveryRecord's ID is in a sequence set and some other -// predicate on DeliveryRecord holds. -template <class Predicate> struct IsInSequenceSetAnd { - IsInSequenceSet isInSet; - Predicate predicate; - IsInSequenceSetAnd(const SequenceSet& s, Predicate p) : isInSet(s), predicate(p) {} - bool operator()(DeliveryRecord& dr) { - return isInSet(dr.getId()) && predicate(dr); - } -}; - -template<class Predicate> IsInSequenceSetAnd<Predicate> -isInSequenceSetAnd(const SequenceSet& s, Predicate p) { - return IsInSequenceSetAnd<Predicate>(s,p); -} - -void SemanticState::accepted(const SequenceSet& commands) { - assertClusterSafe(); - if (txBuffer.get()) { - //in transactional mode, don't dequeue or remove, just - //maintain set of acknowledged messages: - accumulatedAck.add(commands); - - if (dtxBuffer.get()) { - //if enlisted in a dtx, copy the relevant slice from - //unacked and record it against that transaction - TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); - accumulatedAck.clear(); - dtxBuffer->enlist(txAck); - - //mark the relevant messages as 'ended' in unacked - //if the messages are already completed, they can be - //removed from the record - DeliveryRecords::iterator removed = - remove_if(unacked.begin(), unacked.end(), - isInSequenceSetAnd(commands, - bind(&DeliveryRecord::setEnded, _1))); - unacked.erase(removed, unacked.end()); - } - } else { - DeliveryRecords::iterator removed = - remove_if(unacked.begin(), unacked.end(), - isInSequenceSetAnd(commands, - bind(&DeliveryRecord::accept, _1, - (TransactionContext*) 0))); - unacked.erase(removed, unacked.end()); - } -} - -void SemanticState::completed(const SequenceSet& commands) { - DeliveryRecords::iterator removed = - remove_if(unacked.begin(), unacked.end(), - isInSequenceSetAnd(commands, - bind(&SemanticState::complete, this, _1))); - unacked.erase(removed, unacked.end()); - requestDispatch(); -} - -void SemanticState::attached() -{ - for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - i->second->enableNotify(); - session.getConnection().outputTasks.addOutputTask(i->second.get()); - } - session.getConnection().outputTasks.activateOutput(); -} - -void SemanticState::detached() -{ - for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { - i->second->disableNotify(); - session.getConnection().outputTasks.removeOutputTask(i->second.get()); - } -} - -}} // namespace qpid::broker |