diff options
Diffstat (limited to 'cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 171 |
1 files changed, 102 insertions, 69 deletions
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index c91cfba2f8..fbcb21eab9 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -70,14 +70,12 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) deliveryAdapter(da), tagGenerator("sgen"), dtxSelected(false), - authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isFederationLink()), + authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()), userID(getSession().getConnection().getUserId()), userName(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@'))), isDefaultRealm(userID.find('@') != std::string::npos && getSession().getBroker().getOptions().realm == userID.substr(userID.find('@')+1,userID.size())), closeComplete(false) -{ - acl = getSession().getBroker().getAcl(); -} +{} SemanticState::~SemanticState() { closed(); @@ -88,7 +86,7 @@ void SemanticState::closed() { //prevent requeued messages being redelivered to consumers for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { disable(i->second); - } + } if (dtxBuffer.get()) { dtxBuffer->fail(); } @@ -107,16 +105,24 @@ bool SemanticState::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -void SemanticState::consume(const string& tag, +namespace { + const std::string SEPARATOR("::"); +} + +void SemanticState::consume(const string& tag, Queue::shared_ptr queue, bool ackRequired, bool acquire, bool exclusive, const string& resumeId, uint64_t resumeTtl, const FieldTable& arguments) { - ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, acquire, exclusive, resumeId, resumeTtl, arguments)); + // "tag" is only guaranteed to be unique to this session (see AMQP 0-10 Message.subscribe, destination). + // Create a globally unique name so the broker can identify individual consumers + std::string name = session.getSessionId().str() + SEPARATOR + tag; + ConsumerImpl::shared_ptr c(new ConsumerImpl(this, name, queue, ackRequired, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); queue->consume(c, exclusive);//may throw exception consumers[tag] = c; } -void SemanticState::cancel(const string& tag){ +bool SemanticState::cancel(const string& tag) +{ ConsumerImplMap::iterator i = consumers.find(tag); if (i != consumers.end()) { cancel(i->second); @@ -124,7 +130,13 @@ void SemanticState::cancel(const string& tag){ //should cancel all unacked messages for this consumer so that //they are not redelivered on recovery for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::cancel, _1, tag)); - + //can also remove any records that are now redundant + DeliveryRecords::iterator removed = + remove_if(unacked.begin(), unacked.end(), bind(&DeliveryRecord::isRedundant, _1)); + unacked.erase(removed, unacked.end()); + return true; + } else { + return false; } } @@ -167,8 +179,8 @@ void SemanticState::startDtx(const std::string& xid, DtxManager& mgr, bool join) if (!dtxSelected) { throw CommandInvalidException(QPID_MSG("Session has not been selected for use with dtx")); } - dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid)); - txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer); + dtxBuffer.reset(new DtxBuffer(xid)); + txBuffer = dtxBuffer; if (join) { mgr.join(xid, dtxBuffer); } else { @@ -194,7 +206,7 @@ void SemanticState::endDtx(const std::string& xid, bool fail) dtxBuffer->fail(); } else { dtxBuffer->markEnded(); - } + } dtxBuffer.reset(); } @@ -236,7 +248,7 @@ void SemanticState::resumeDtx(const std::string& xid) checkDtxTimeout(); dtxBuffer->setSuspended(false); - txBuffer = boost::static_pointer_cast<TxBuffer>(dtxBuffer); + txBuffer = dtxBuffer; } void SemanticState::checkDtxTimeout() @@ -254,31 +266,33 @@ void SemanticState::record(const DeliveryRecord& delivery) const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); -SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, - const string& _name, - Queue::shared_ptr _queue, +SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, + const string& _name, + Queue::shared_ptr _queue, bool ack, bool _acquire, bool _exclusive, + const string& _tag, const string& _resumeId, uint64_t _resumeTtl, const framing::FieldTable& _arguments -) : - Consumer(_acquire), - parent(_parent), - name(_name), - queue(_queue), - ackExpected(ack), +) : + Consumer(_name, _acquire), + parent(_parent), + queue(_queue), + ackExpected(ack), acquire(_acquire), - blocked(true), + blocked(true), windowing(true), + windowActive(false), exclusive(_exclusive), resumeId(_resumeId), + tag(_tag), resumeTtl(_resumeTtl), arguments(_arguments), - msgCredit(0), + msgCredit(0), byteCredit(0), notifyEnabled(true), syncFrequency(_arguments.getAsInt(QPID_SYNC_FREQUENCY)), @@ -289,10 +303,10 @@ SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, { ManagementAgent* agent = parent->session.getBroker().getManagementAgent(); qpid::management::Manageable* ms = dynamic_cast<qpid::management::Manageable*> (&(parent->session)); - + if (agent != 0) { - mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId() ,name, + mgmtObject = new _qmf::Subscription(agent, this, ms , queue->GetManagementObject()->getObjectId(), getTag(), !acquire, ackExpected, exclusive, ManagementAgent::toMap(arguments)); agent->addObject (mgmtObject); mgmtObject->set_creditMode("WINDOW"); @@ -324,16 +338,16 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) { assertClusterSafe(); allocateCredit(msg.payload); - DeliveryRecord record(msg, queue, name, acquire, !ackExpected, windowing); + DeliveryRecord record(msg, queue, getTag(), acquire, !ackExpected, windowing); bool sync = syncFrequency && ++deliveryCount >= syncFrequency; if (sync) deliveryCount = 0;//reset parent->deliver(record, sync); - if (!ackExpected && acquire) record.setEnded();//allows message to be released now its been delivered if (windowing || ackExpected || !acquire) { parent->record(record); - } - if (acquire && !ackExpected) { - queue->dequeue(0, msg); + } + if (acquire && !ackExpected) { // auto acquire && auto accept + queue->dequeue(0 /*ctxt*/, msg); + record.setEnded(); } if (mgmtObject) { mgmtObject->inc_delivered(); } return true; @@ -351,7 +365,7 @@ bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg) // checkCredit fails because the message is to big, we should // remain on queue's listener list for possible smaller messages // in future. - // + // blocked = !(filter(msg) && checkCredit(msg)); return !blocked; } @@ -363,7 +377,7 @@ struct ConsumerName { }; ostream& operator<<(ostream& o, const ConsumerName& pc) { - return o << pc.consumer.getName() << " on " + return o << pc.consumer.getTag() << " on " << pc.consumer.getParent().getSession().getSessionId(); } } @@ -372,7 +386,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) { assertClusterSafe(); uint32_t originalMsgCredit = msgCredit; - uint32_t originalByteCredit = byteCredit; + uint32_t originalByteCredit = byteCredit; if (msgCredit != 0xFFFFFFFF) { msgCredit--; } @@ -382,7 +396,7 @@ void SemanticState::ConsumerImpl::allocateCredit(intrusive_ptr<Message>& msg) QPID_LOG(debug, "Credit allocated for " << ConsumerName(*this) << ", was " << " bytes: " << originalByteCredit << " msgs: " << originalMsgCredit << " now bytes: " << byteCredit << " msgs: " << msgCredit); - + } bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) @@ -396,7 +410,7 @@ bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg) return enoughCredit; } -SemanticState::ConsumerImpl::~ConsumerImpl() +SemanticState::ConsumerImpl::~ConsumerImpl() { if (mgmtObject != 0) mgmtObject->resourceDestroy (); @@ -414,7 +428,7 @@ void SemanticState::unsubscribe(ConsumerImpl::shared_ptr c) Queue::shared_ptr queue = c->getQueue(); if(queue) { queue->cancel(c); - if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { + if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) { Queue::tryAutoDelete(session.getBroker(), queue); } } @@ -456,23 +470,23 @@ const std::string nullstring; } void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { - msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); - + msg->computeExpiration(getSession().getBroker().getExpiryPolicy()); + std::string exchangeName = msg->getExchangeName(); - if (!cacheExchange || cacheExchange->getName() != exchangeName) + if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) cacheExchange = session.getBroker().getExchanges().get(exchangeName); cacheExchange->setProperties(msg); /* verify the userid if specified: */ std::string id = msg->hasProperties<MessageProperties>() ? msg->getProperties<MessageProperties>()->getUserId() : nullstring; - if (authMsg && !id.empty() && !(id == userID || (isDefaultRealm && id == userName))) { QPID_LOG(debug, "authorised user id : " << userID << " but user id in message declared as " << id); throw UnauthorizedAccessException(QPID_MSG("authorised user id : " << userID << " but user id in message declared as " << id)); } + AclModule* acl = getSession().getBroker().getAcl(); if (acl && acl->doTransferAcl()) { if (!acl->authorise(getSession().getConnection().getUserId(),acl::ACT_PUBLISH,acl::OBJ_EXCHANGE,exchangeName, msg->getRoutingKey() )) @@ -484,7 +498,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { if (!strategy.delivered) { //TODO:if discard-unroutable, just drop it - //TODO:else if accept-mode is explicit, reject it + //TODO:else if accept-mode is explicit, reject it //else route it to alternate exchange if (cacheExchange->getAlternate()) { cacheExchange->getAlternate()->route(strategy, msg->getRoutingKey(), msg->getApplicationHeaders()); @@ -513,7 +527,7 @@ void SemanticState::ConsumerImpl::requestDispatch() } bool SemanticState::complete(DeliveryRecord& delivery) -{ +{ ConsumerImplMap::iterator i = consumers.find(delivery.getTag()); if (i != consumers.end()) { i->second->complete(delivery); @@ -525,7 +539,7 @@ void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery) { if (!delivery.isComplete()) { delivery.complete(); - if (windowing) { + if (windowing && windowActive) { if (msgCredit != 0xFFFFFFFF) msgCredit++; if (byteCredit != 0xFFFFFFFF) byteCredit += delivery.getCredit(); } @@ -541,7 +555,7 @@ void SemanticState::recover(bool requeue) unacked.clear(); for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); }else{ - for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this)); + for_each(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::redeliver, _1, this)); //unconfirmed messages re redelivered and therefore have their //id adjusted, confirmed messages are not and so the ordering //w.r.t id is lost @@ -554,50 +568,61 @@ void SemanticState::deliver(DeliveryRecord& msg, bool sync) return deliveryAdapter.deliver(msg, sync); } -SemanticState::ConsumerImpl& SemanticState::find(const std::string& destination) +const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const { - ConsumerImplMap::iterator i = consumers.find(destination); - if (i == consumers.end()) { - throw NotFoundException(QPID_MSG("Unknown destination " << destination)); + ConsumerImpl::shared_ptr consumer; + if (!find(destination, consumer)) { + throw NotFoundException(QPID_MSG("Unknown destination " << destination << " session=" << session.getSessionId())); } else { - return *(i->second); + return consumer; + } +} + +bool SemanticState::find(const std::string& destination, ConsumerImpl::shared_ptr& consumer) const +{ + // @todo KAG gsim: shouldn't the consumers map be locked???? + ConsumerImplMap::const_iterator i = consumers.find(destination); + if (i == consumers.end()) { + return false; } + consumer = i->second; + return true; } void SemanticState::setWindowMode(const std::string& destination) { - find(destination).setWindowMode(); + find(destination)->setWindowMode(); } void SemanticState::setCreditMode(const std::string& destination) { - find(destination).setCreditMode(); + find(destination)->setCreditMode(); } void SemanticState::addByteCredit(const std::string& destination, uint32_t value) { - ConsumerImpl& c = find(destination); - c.addByteCredit(value); - c.requestDispatch(); + ConsumerImpl::shared_ptr c = find(destination); + c->addByteCredit(value); + c->requestDispatch(); } void SemanticState::addMessageCredit(const std::string& destination, uint32_t value) { - ConsumerImpl& c = find(destination); - c.addMessageCredit(value); - c.requestDispatch(); + ConsumerImpl::shared_ptr c = find(destination); + c->addMessageCredit(value); + c->requestDispatch(); } void SemanticState::flush(const std::string& destination) { - find(destination).flush(); + find(destination)->flush(); } void SemanticState::stop(const std::string& destination) { - find(destination).stop(); + find(destination)->stop(); } void SemanticState::ConsumerImpl::setWindowMode() @@ -621,6 +646,7 @@ void SemanticState::ConsumerImpl::setCreditMode() void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) { assertClusterSafe(); + if (windowing) windowActive = true; if (byteCredit != 0xFFFFFFFF) { if (value == 0xFFFFFFFF) byteCredit = value; else byteCredit += value; @@ -630,6 +656,7 @@ void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) { assertClusterSafe(); + if (windowing) windowActive = true; if (msgCredit != 0xFFFFFFFF) { if (value == 0xFFFFFFFF) msgCredit = value; else msgCredit += value; @@ -650,7 +677,8 @@ void SemanticState::ConsumerImpl::flush() { while(haveCredit() && queue->dispatch(shared_from_this())) ; - stop(); + msgCredit = 0; + byteCredit = 0; } void SemanticState::ConsumerImpl::stop() @@ -658,6 +686,7 @@ void SemanticState::ConsumerImpl::stop() assertClusterSafe(); msgCredit = 0; byteCredit = 0; + windowActive = false; } Queue::shared_ptr SemanticState::getQueue(const string& name) const { @@ -673,7 +702,7 @@ Queue::shared_ptr SemanticState::getQueue(const string& name) const { } AckRange SemanticState::findRange(DeliveryId first, DeliveryId last) -{ +{ return DeliveryRecord::findRange(unacked, first, last); } @@ -691,14 +720,21 @@ void SemanticState::release(DeliveryId first, DeliveryId last, bool setRedeliver DeliveryRecords::reverse_iterator start(range.end); DeliveryRecords::reverse_iterator end(range.start); for_each(start, end, boost::bind(&DeliveryRecord::release, _1, setRedelivered)); + + DeliveryRecords::iterator removed = + remove_if(range.start, range.end, bind(&DeliveryRecord::isRedundant, _1)); + unacked.erase(removed, range.end); } void SemanticState::reject(DeliveryId first, DeliveryId last) { AckRange range = findRange(first, last); for_each(range.start, range.end, mem_fun_ref(&DeliveryRecord::reject)); - //need to remove the delivery records as well - unacked.erase(range.start, range.end); + //may need to remove the delivery records as well + for (DeliveryRecords::iterator i = range.start; i != unacked.end() && i->getId() <= last; ) { + if (i->isRedundant()) i = unacked.erase(i); + else i++; + } } bool SemanticState::ConsumerImpl::doOutput() @@ -761,13 +797,13 @@ void SemanticState::accepted(const SequenceSet& commands) { //in transactional mode, don't dequeue or remove, just //maintain set of acknowledged messages: accumulatedAck.add(commands); - + if (dtxBuffer.get()) { //if enlisted in a dtx, copy the relevant slice from //unacked and record it against that transaction TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); accumulatedAck.clear(); - dtxBuffer->enlist(txAck); + dtxBuffer->enlist(txAck); //mark the relevant messages as 'ended' in unacked //if the messages are already completed, they can be @@ -789,7 +825,6 @@ void SemanticState::accepted(const SequenceSet& commands) { } void SemanticState::completed(const SequenceSet& commands) { - assertClusterSafe(); DeliveryRecords::iterator removed = remove_if(unacked.begin(), unacked.end(), isInSequenceSetAnd(commands, @@ -800,7 +835,6 @@ void SemanticState::completed(const SequenceSet& commands) { void SemanticState::attached() { - assertClusterSafe(); for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->enableNotify(); session.getConnection().outputTasks.addOutputTask(i->second.get()); @@ -810,7 +844,6 @@ void SemanticState::attached() void SemanticState::detached() { - assertClusterSafe(); for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { i->second->disableNotify(); session.getConnection().outputTasks.removeOutputTask(i->second.get()); |