diff options
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DtxAck.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DtxAck.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 63 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 11 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/TxAccept.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/TxAccept.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/TxAck.cpp | 59 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/TxAck.h | 57 |
11 files changed, 22 insertions, 197 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 4406eccc44..530dca99a4 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -83,8 +83,8 @@ bool DeliveryRecord::after(DeliveryId tag) const{ return id > tag; } -bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const{ - return range->covers(id); +bool DeliveryRecord::coveredBy(const framing::SequenceSet* const range) const{ + return range->contains(id); } void DeliveryRecord::redeliver(SemanticState* const session) { @@ -118,6 +118,8 @@ void DeliveryRecord::release(bool setRedelivered) queue->requeue(msg); acquired = false; setEnded(); + } else { + QPID_LOG(debug, "Ignoring release for " << id << " acquired=" << acquired << ", ended =" << ended); } } @@ -130,6 +132,7 @@ void DeliveryRecord::accept(TransactionContext* ctxt) { if (acquired && !ended) { queue->dequeue(ctxt, msg.payload); setEnded(); + QPID_LOG(debug, "Accepted " << id); } } diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index 7d08a4b1f0..78dc99e3c6 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -25,7 +25,7 @@ #include <list> #include <vector> #include <ostream> -#include "qpid/framing/AccumulatedAck.h" +#include "qpid/framing/SequenceSet.h" #include "Queue.h" #include "Consumer.h" #include "DeliveryId.h" @@ -63,7 +63,7 @@ class DeliveryRecord{ bool matches(DeliveryId tag) const; bool matchOrAfter(DeliveryId tag) const; bool after(DeliveryId tag) const; - bool coveredBy(const framing::AccumulatedAck* const range) const; + bool coveredBy(const framing::SequenceSet* const range) const; void dequeue(TransactionContext* ctxt = 0) const; void requeue() const; diff --git a/cpp/src/qpid/broker/DtxAck.cpp b/cpp/src/qpid/broker/DtxAck.cpp index 0e6f94d4e0..47637369ca 100644 --- a/cpp/src/qpid/broker/DtxAck.cpp +++ b/cpp/src/qpid/broker/DtxAck.cpp @@ -26,7 +26,7 @@ using std::bind2nd; using std::mem_fun_ref; using namespace qpid::broker; -DtxAck::DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked) +DtxAck::DtxAck(const framing::SequenceSet& acked, std::list<DeliveryRecord>& unacked) { remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, pending.end()), not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked))); diff --git a/cpp/src/qpid/broker/DtxAck.h b/cpp/src/qpid/broker/DtxAck.h index c61b279c42..05c4499839 100644 --- a/cpp/src/qpid/broker/DtxAck.h +++ b/cpp/src/qpid/broker/DtxAck.h @@ -24,7 +24,7 @@ #include <algorithm> #include <functional> #include <list> -#include "qpid/framing/AccumulatedAck.h" +#include "qpid/framing/SequenceSet.h" #include "DeliveryRecord.h" #include "TxOp.h" @@ -34,7 +34,7 @@ namespace qpid { std::list<DeliveryRecord> pending; public: - DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); + DtxAck(const framing::SequenceSet& acked, std::list<DeliveryRecord>& unacked); virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index ad617c1bc1..bdd8edac87 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -28,7 +28,6 @@ #include "Queue.h" #include "SessionContext.h" #include "TxAccept.h" -#include "TxAck.h" #include "TxPublish.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/MessageTransferBody.h" @@ -63,7 +62,6 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss) prefetchCount(0), tagGenerator("sgen"), dtxSelected(false), - accumulatedAck(0), flowActive(true), outputTasks(ss) { @@ -116,14 +114,12 @@ void SemanticState::startTx() txBuffer = TxBuffer::shared_ptr(new TxBuffer()); } -void SemanticState::commit(MessageStore* const store, bool completeOnCommit) +void SemanticState::commit(MessageStore* const store) { if (!txBuffer) throw CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions")); - TxOp::shared_ptr txAck(completeOnCommit ? - static_cast<TxOp*>(new TxAck(accumulatedAck, unacked)) : - static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked))); + TxOp::shared_ptr txAck(static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked))); txBuffer->enlist(txAck); if (txBuffer->commitLocal(store)) { accumulatedAck.clear(); @@ -377,59 +373,6 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { } -void SemanticState::ackCumulative(DeliveryId id) -{ - ack(id, id, true); -} - -void SemanticState::ackRange(DeliveryId first, DeliveryId last) -{ - ack(first, last, false); -} - -void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative) -{ - { - ack_iterator start = cumulative ? unacked.begin() : - find_if(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::matchOrAfter, _1, first)); - ack_iterator end = start; - - if (cumulative || first != last) { - //need to find end (position it just after the last record in range) - end = find_if(start, unacked.end(), boost::bind(&DeliveryRecord::after, _1, last)); - } else if (start != unacked.end()) { - //just acked single element (move end past it) - ++end; - } - - for_each(start, end, boost::bind(&SemanticState::complete, this, _1)); - - if (txBuffer.get()) { - //in transactional mode, don't dequeue or remove, just - //maintain set of acknowledged messages: - accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last); - - if (dtxBuffer.get()) { - //if enlisted in a dtx, copy the relevant slice from - //unacked and record it against that transaction: - TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); - //then remove that slice from the unacked record: - unacked.remove_if(boost::bind(&DeliveryRecord::coveredBy, _1, &accumulatedAck)); - accumulatedAck.clear(); - dtxBuffer->enlist(txAck); - } - } else { - for_each(start, end, boost::bind(&DeliveryRecord::dequeue, _1, (TransactionContext*) 0)); - unacked.erase(start, end); - } - }//end of lock scope for delivery lock (TODO this is ugly, make it prettier) - - //if the prefetch limit had previously been reached, or credit - //had expired in windowing mode there may be messages that can - //be now be delivered - requestDispatch(); -} - void SemanticState::requestDispatch() { for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) { @@ -667,7 +610,7 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last) if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just //maintain set of acknowledged messages: - accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet + accumulatedAck.add(first, last); if (dtxBuffer.get()) { //if enlisted in a dtx, copy the relevant slice from diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 84dc0fc5bb..bf3a7756b5 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -34,7 +34,7 @@ #include "TxBuffer.h" #include "qpid/framing/FrameHandler.h" -#include "qpid/framing/AccumulatedAck.h" +#include "qpid/framing/SequenceSet.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/AggregateOutput.h" #include "qpid/shared_ptr.h" @@ -115,7 +115,7 @@ class SemanticState : public framing::FrameHandler::Chains, DtxBuffer::shared_ptr dtxBuffer; bool dtxSelected; DtxBufferMap suspendedXids; - framing::AccumulatedAck accumulatedAck; + framing::SequenceSet accumulatedAck; bool flowActive; boost::shared_ptr<Exchange> cacheExchange; sys::AggregateOutput outputTasks; @@ -125,7 +125,6 @@ class SemanticState : public framing::FrameHandler::Chains, bool checkPrefetch(boost::intrusive_ptr<Message>& msg); void checkDtxTimeout(); ConsumerImpl& find(const std::string& destination); - void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative); void complete(DeliveryRecord&); AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); @@ -168,7 +167,7 @@ class SemanticState : public framing::FrameHandler::Chains, bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected); void startTx(); - void commit(MessageStore* const store, bool completeOnCommit); + void commit(MessageStore* const store); void rollback(); void selectDtx(); void startDtx(const std::string& xid, DtxManager& mgr, bool join); @@ -184,10 +183,6 @@ class SemanticState : public framing::FrameHandler::Chains, void handle(boost::intrusive_ptr<Message> msg); bool doOutput() { return outputTasks.doOutput(); } - //preview only (completed == ack): - void ackCumulative(DeliveryId deliveryTag); - void ackRange(DeliveryId deliveryTag, DeliveryId endTag); - //final 0-10 spec (completed and accepted are distinct): void completed(DeliveryId deliveryTag, DeliveryId endTag); void accepted(DeliveryId deliveryTag, DeliveryId endTag); diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index 3d795014b8..e1589aea99 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -464,7 +464,7 @@ void SessionAdapter::TxHandlerImpl::select() void SessionAdapter::TxHandlerImpl::commit() { - state.commit(&getBroker().getStore(), false); + state.commit(&getBroker().getStore()); } void SessionAdapter::TxHandlerImpl::rollback() diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp index 634d066ecc..82acf61cd1 100644 --- a/cpp/src/qpid/broker/TxAccept.cpp +++ b/cpp/src/qpid/broker/TxAccept.cpp @@ -25,9 +25,9 @@ using std::bind1st; using std::bind2nd; using std::mem_fun_ref; using namespace qpid::broker; -using qpid::framing::AccumulatedAck; +using qpid::framing::SequenceSet; -TxAccept::TxAccept(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) : +TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked) : acked(_acked), unacked(_unacked) {} bool TxAccept::prepare(TransactionContext* ctxt) throw() diff --git a/cpp/src/qpid/broker/TxAccept.h b/cpp/src/qpid/broker/TxAccept.h index 011acf5d9e..9548c50c2a 100644 --- a/cpp/src/qpid/broker/TxAccept.h +++ b/cpp/src/qpid/broker/TxAccept.h @@ -24,7 +24,7 @@ #include <algorithm> #include <functional> #include <list> -#include "qpid/framing/AccumulatedAck.h" +#include "qpid/framing/SequenceSet.h" #include "DeliveryRecord.h" #include "TxOp.h" @@ -35,7 +35,7 @@ namespace qpid { * a transactional channel. */ class TxAccept : public TxOp{ - framing::AccumulatedAck& acked; + framing::SequenceSet& acked; std::list<DeliveryRecord>& unacked; public: @@ -44,7 +44,7 @@ namespace qpid { * acks received * @param unacked the record of delivered messages */ - TxAccept(framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); + TxAccept(framing::SequenceSet& acked, std::list<DeliveryRecord>& unacked); virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); diff --git a/cpp/src/qpid/broker/TxAck.cpp b/cpp/src/qpid/broker/TxAck.cpp deleted file mode 100644 index 40b9b0ff33..0000000000 --- a/cpp/src/qpid/broker/TxAck.cpp +++ /dev/null @@ -1,59 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "TxAck.h" -#include "qpid/log/Statement.h" - -using std::bind1st; -using std::bind2nd; -using std::mem_fun_ref; -using namespace qpid::broker; -using qpid::framing::AccumulatedAck; - -TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) : - acked(_acked), unacked(_unacked){ - -} - -bool TxAck::prepare(TransactionContext* ctxt) throw(){ - try{ - //dequeue all acked messages from their queues - for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { - if (i->coveredBy(&acked)) { - i->dequeue(ctxt); - } - } - return true; - }catch(const std::exception& e){ - QPID_LOG(error, "Failed to prepare: " << e.what()); - return false; - }catch(...){ - QPID_LOG(error, "Failed to prepare"); - return false; - } -} - -void TxAck::commit() throw(){ - //remove all acked records from the list - unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)); -} - -void TxAck::rollback() throw(){ -} diff --git a/cpp/src/qpid/broker/TxAck.h b/cpp/src/qpid/broker/TxAck.h deleted file mode 100644 index c8383b6314..0000000000 --- a/cpp/src/qpid/broker/TxAck.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _TxAck_ -#define _TxAck_ - -#include <algorithm> -#include <functional> -#include <list> -#include "qpid/framing/AccumulatedAck.h" -#include "DeliveryRecord.h" -#include "TxOp.h" - -namespace qpid { - namespace broker { - /** - * Defines the transactional behaviour for acks received by a - * transactional channel. - */ - class TxAck : public TxOp{ - framing::AccumulatedAck& acked; - std::list<DeliveryRecord>& unacked; - - public: - /** - * @param acked a representation of the accumulation of - * acks received - * @param unacked the record of delivered messages - */ - TxAck(framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); - virtual bool prepare(TransactionContext* ctxt) throw(); - virtual void commit() throw(); - virtual void rollback() throw(); - virtual ~TxAck(){} - }; - } -} - - -#endif |
