summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp7
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h4
-rw-r--r--cpp/src/qpid/broker/DtxAck.cpp2
-rw-r--r--cpp/src/qpid/broker/DtxAck.h4
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp63
-rw-r--r--cpp/src/qpid/broker/SemanticState.h11
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp2
-rw-r--r--cpp/src/qpid/broker/TxAccept.cpp4
-rw-r--r--cpp/src/qpid/broker/TxAccept.h6
-rw-r--r--cpp/src/qpid/broker/TxAck.cpp59
-rw-r--r--cpp/src/qpid/broker/TxAck.h57
11 files changed, 22 insertions, 197 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 4406eccc44..530dca99a4 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -83,8 +83,8 @@ bool DeliveryRecord::after(DeliveryId tag) const{
return id > tag;
}
-bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const{
- return range->covers(id);
+bool DeliveryRecord::coveredBy(const framing::SequenceSet* const range) const{
+ return range->contains(id);
}
void DeliveryRecord::redeliver(SemanticState* const session) {
@@ -118,6 +118,8 @@ void DeliveryRecord::release(bool setRedelivered)
queue->requeue(msg);
acquired = false;
setEnded();
+ } else {
+ QPID_LOG(debug, "Ignoring release for " << id << " acquired=" << acquired << ", ended =" << ended);
}
}
@@ -130,6 +132,7 @@ void DeliveryRecord::accept(TransactionContext* ctxt) {
if (acquired && !ended) {
queue->dequeue(ctxt, msg.payload);
setEnded();
+ QPID_LOG(debug, "Accepted " << id);
}
}
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index 7d08a4b1f0..78dc99e3c6 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -25,7 +25,7 @@
#include <list>
#include <vector>
#include <ostream>
-#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/SequenceSet.h"
#include "Queue.h"
#include "Consumer.h"
#include "DeliveryId.h"
@@ -63,7 +63,7 @@ class DeliveryRecord{
bool matches(DeliveryId tag) const;
bool matchOrAfter(DeliveryId tag) const;
bool after(DeliveryId tag) const;
- bool coveredBy(const framing::AccumulatedAck* const range) const;
+ bool coveredBy(const framing::SequenceSet* const range) const;
void dequeue(TransactionContext* ctxt = 0) const;
void requeue() const;
diff --git a/cpp/src/qpid/broker/DtxAck.cpp b/cpp/src/qpid/broker/DtxAck.cpp
index 0e6f94d4e0..47637369ca 100644
--- a/cpp/src/qpid/broker/DtxAck.cpp
+++ b/cpp/src/qpid/broker/DtxAck.cpp
@@ -26,7 +26,7 @@ using std::bind2nd;
using std::mem_fun_ref;
using namespace qpid::broker;
-DtxAck::DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked)
+DtxAck::DtxAck(const framing::SequenceSet& acked, std::list<DeliveryRecord>& unacked)
{
remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, pending.end()),
not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked)));
diff --git a/cpp/src/qpid/broker/DtxAck.h b/cpp/src/qpid/broker/DtxAck.h
index c61b279c42..05c4499839 100644
--- a/cpp/src/qpid/broker/DtxAck.h
+++ b/cpp/src/qpid/broker/DtxAck.h
@@ -24,7 +24,7 @@
#include <algorithm>
#include <functional>
#include <list>
-#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/SequenceSet.h"
#include "DeliveryRecord.h"
#include "TxOp.h"
@@ -34,7 +34,7 @@ namespace qpid {
std::list<DeliveryRecord> pending;
public:
- DtxAck(const framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
+ DtxAck(const framing::SequenceSet& acked, std::list<DeliveryRecord>& unacked);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index ad617c1bc1..bdd8edac87 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -28,7 +28,6 @@
#include "Queue.h"
#include "SessionContext.h"
#include "TxAccept.h"
-#include "TxAck.h"
#include "TxPublish.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
@@ -63,7 +62,6 @@ SemanticState::SemanticState(DeliveryAdapter& da, SessionContext& ss)
prefetchCount(0),
tagGenerator("sgen"),
dtxSelected(false),
- accumulatedAck(0),
flowActive(true),
outputTasks(ss)
{
@@ -116,14 +114,12 @@ void SemanticState::startTx()
txBuffer = TxBuffer::shared_ptr(new TxBuffer());
}
-void SemanticState::commit(MessageStore* const store, bool completeOnCommit)
+void SemanticState::commit(MessageStore* const store)
{
if (!txBuffer) throw
CommandInvalidException(QPID_MSG("Session has not been selected for use with transactions"));
- TxOp::shared_ptr txAck(completeOnCommit ?
- static_cast<TxOp*>(new TxAck(accumulatedAck, unacked)) :
- static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked)));
+ TxOp::shared_ptr txAck(static_cast<TxOp*>(new TxAccept(accumulatedAck, unacked)));
txBuffer->enlist(txAck);
if (txBuffer->commitLocal(store)) {
accumulatedAck.clear();
@@ -377,59 +373,6 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
}
-void SemanticState::ackCumulative(DeliveryId id)
-{
- ack(id, id, true);
-}
-
-void SemanticState::ackRange(DeliveryId first, DeliveryId last)
-{
- ack(first, last, false);
-}
-
-void SemanticState::ack(DeliveryId first, DeliveryId last, bool cumulative)
-{
- {
- ack_iterator start = cumulative ? unacked.begin() :
- find_if(unacked.begin(), unacked.end(), boost::bind(&DeliveryRecord::matchOrAfter, _1, first));
- ack_iterator end = start;
-
- if (cumulative || first != last) {
- //need to find end (position it just after the last record in range)
- end = find_if(start, unacked.end(), boost::bind(&DeliveryRecord::after, _1, last));
- } else if (start != unacked.end()) {
- //just acked single element (move end past it)
- ++end;
- }
-
- for_each(start, end, boost::bind(&SemanticState::complete, this, _1));
-
- if (txBuffer.get()) {
- //in transactional mode, don't dequeue or remove, just
- //maintain set of acknowledged messages:
- accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
-
- if (dtxBuffer.get()) {
- //if enlisted in a dtx, copy the relevant slice from
- //unacked and record it against that transaction:
- TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
- //then remove that slice from the unacked record:
- unacked.remove_if(boost::bind(&DeliveryRecord::coveredBy, _1, &accumulatedAck));
- accumulatedAck.clear();
- dtxBuffer->enlist(txAck);
- }
- } else {
- for_each(start, end, boost::bind(&DeliveryRecord::dequeue, _1, (TransactionContext*) 0));
- unacked.erase(start, end);
- }
- }//end of lock scope for delivery lock (TODO this is ugly, make it prettier)
-
- //if the prefetch limit had previously been reached, or credit
- //had expired in windowing mode there may be messages that can
- //be now be delivered
- requestDispatch();
-}
-
void SemanticState::requestDispatch()
{
for (ConsumerImplMap::iterator i = consumers.begin(); i != consumers.end(); i++) {
@@ -667,7 +610,7 @@ void SemanticState::accepted(DeliveryId first, DeliveryId last)
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
//maintain set of acknowledged messages:
- accumulatedAck.update(first, last);//TODO convert accumulatedAck to SequenceSet
+ accumulatedAck.add(first, last);
if (dtxBuffer.get()) {
//if enlisted in a dtx, copy the relevant slice from
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 84dc0fc5bb..bf3a7756b5 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -34,7 +34,7 @@
#include "TxBuffer.h"
#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/AggregateOutput.h"
#include "qpid/shared_ptr.h"
@@ -115,7 +115,7 @@ class SemanticState : public framing::FrameHandler::Chains,
DtxBuffer::shared_ptr dtxBuffer;
bool dtxSelected;
DtxBufferMap suspendedXids;
- framing::AccumulatedAck accumulatedAck;
+ framing::SequenceSet accumulatedAck;
bool flowActive;
boost::shared_ptr<Exchange> cacheExchange;
sys::AggregateOutput outputTasks;
@@ -125,7 +125,6 @@ class SemanticState : public framing::FrameHandler::Chains,
bool checkPrefetch(boost::intrusive_ptr<Message>& msg);
void checkDtxTimeout();
ConsumerImpl& find(const std::string& destination);
- void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
void complete(DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
@@ -168,7 +167,7 @@ class SemanticState : public framing::FrameHandler::Chains,
bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected);
void startTx();
- void commit(MessageStore* const store, bool completeOnCommit);
+ void commit(MessageStore* const store);
void rollback();
void selectDtx();
void startDtx(const std::string& xid, DtxManager& mgr, bool join);
@@ -184,10 +183,6 @@ class SemanticState : public framing::FrameHandler::Chains,
void handle(boost::intrusive_ptr<Message> msg);
bool doOutput() { return outputTasks.doOutput(); }
- //preview only (completed == ack):
- void ackCumulative(DeliveryId deliveryTag);
- void ackRange(DeliveryId deliveryTag, DeliveryId endTag);
-
//final 0-10 spec (completed and accepted are distinct):
void completed(DeliveryId deliveryTag, DeliveryId endTag);
void accepted(DeliveryId deliveryTag, DeliveryId endTag);
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index 3d795014b8..e1589aea99 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -464,7 +464,7 @@ void SessionAdapter::TxHandlerImpl::select()
void SessionAdapter::TxHandlerImpl::commit()
{
- state.commit(&getBroker().getStore(), false);
+ state.commit(&getBroker().getStore());
}
void SessionAdapter::TxHandlerImpl::rollback()
diff --git a/cpp/src/qpid/broker/TxAccept.cpp b/cpp/src/qpid/broker/TxAccept.cpp
index 634d066ecc..82acf61cd1 100644
--- a/cpp/src/qpid/broker/TxAccept.cpp
+++ b/cpp/src/qpid/broker/TxAccept.cpp
@@ -25,9 +25,9 @@ using std::bind1st;
using std::bind2nd;
using std::mem_fun_ref;
using namespace qpid::broker;
-using qpid::framing::AccumulatedAck;
+using qpid::framing::SequenceSet;
-TxAccept::TxAccept(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) :
+TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked) :
acked(_acked), unacked(_unacked) {}
bool TxAccept::prepare(TransactionContext* ctxt) throw()
diff --git a/cpp/src/qpid/broker/TxAccept.h b/cpp/src/qpid/broker/TxAccept.h
index 011acf5d9e..9548c50c2a 100644
--- a/cpp/src/qpid/broker/TxAccept.h
+++ b/cpp/src/qpid/broker/TxAccept.h
@@ -24,7 +24,7 @@
#include <algorithm>
#include <functional>
#include <list>
-#include "qpid/framing/AccumulatedAck.h"
+#include "qpid/framing/SequenceSet.h"
#include "DeliveryRecord.h"
#include "TxOp.h"
@@ -35,7 +35,7 @@ namespace qpid {
* a transactional channel.
*/
class TxAccept : public TxOp{
- framing::AccumulatedAck& acked;
+ framing::SequenceSet& acked;
std::list<DeliveryRecord>& unacked;
public:
@@ -44,7 +44,7 @@ namespace qpid {
* acks received
* @param unacked the record of delivered messages
*/
- TxAccept(framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
+ TxAccept(framing::SequenceSet& acked, std::list<DeliveryRecord>& unacked);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
diff --git a/cpp/src/qpid/broker/TxAck.cpp b/cpp/src/qpid/broker/TxAck.cpp
deleted file mode 100644
index 40b9b0ff33..0000000000
--- a/cpp/src/qpid/broker/TxAck.cpp
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "TxAck.h"
-#include "qpid/log/Statement.h"
-
-using std::bind1st;
-using std::bind2nd;
-using std::mem_fun_ref;
-using namespace qpid::broker;
-using qpid::framing::AccumulatedAck;
-
-TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) :
- acked(_acked), unacked(_unacked){
-
-}
-
-bool TxAck::prepare(TransactionContext* ctxt) throw(){
- try{
- //dequeue all acked messages from their queues
- for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) {
- if (i->coveredBy(&acked)) {
- i->dequeue(ctxt);
- }
- }
- return true;
- }catch(const std::exception& e){
- QPID_LOG(error, "Failed to prepare: " << e.what());
- return false;
- }catch(...){
- QPID_LOG(error, "Failed to prepare");
- return false;
- }
-}
-
-void TxAck::commit() throw(){
- //remove all acked records from the list
- unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked));
-}
-
-void TxAck::rollback() throw(){
-}
diff --git a/cpp/src/qpid/broker/TxAck.h b/cpp/src/qpid/broker/TxAck.h
deleted file mode 100644
index c8383b6314..0000000000
--- a/cpp/src/qpid/broker/TxAck.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _TxAck_
-#define _TxAck_
-
-#include <algorithm>
-#include <functional>
-#include <list>
-#include "qpid/framing/AccumulatedAck.h"
-#include "DeliveryRecord.h"
-#include "TxOp.h"
-
-namespace qpid {
- namespace broker {
- /**
- * Defines the transactional behaviour for acks received by a
- * transactional channel.
- */
- class TxAck : public TxOp{
- framing::AccumulatedAck& acked;
- std::list<DeliveryRecord>& unacked;
-
- public:
- /**
- * @param acked a representation of the accumulation of
- * acks received
- * @param unacked the record of delivered messages
- */
- TxAck(framing::AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
- virtual bool prepare(TransactionContext* ctxt) throw();
- virtual void commit() throw();
- virtual void rollback() throw();
- virtual ~TxAck(){}
- };
- }
-}
-
-
-#endif