summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-10-29 15:23:36 +0000
committerAlan Conway <aconway@apache.org>2013-10-29 15:23:36 +0000
commit4a3eb469d5eb8c97c719c70e42814cd703e12fbd (patch)
tree5fefbf7b5c2f327275c8bd929ed83281ec54114d
parentfd23db50312860de7585001588fae44b4f8e9480 (diff)
downloadqpid-python-4a3eb469d5eb8c97c719c70e42814cd703e12fbd.tar.gz
QPID-5139: Make TxBuffer inherit from AsyncCompletion.
Switched from shared_ptr to intrusive_ptr for TxBuffer and DtxBuffer. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1536752 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/BrokerObserver.h4
-rw-r--r--cpp/src/qpid/broker/BrokerObservers.h4
-rw-r--r--cpp/src/qpid/broker/DtxBuffer.h5
-rw-r--r--cpp/src/qpid/broker/DtxManager.cpp6
-rw-r--r--cpp/src/qpid/broker/DtxManager.h6
-rw-r--r--cpp/src/qpid/broker/DtxWorkRecord.cpp4
-rw-r--r--cpp/src/qpid/broker/DtxWorkRecord.h6
-rw-r--r--cpp/src/qpid/broker/RecoverableMessageImpl.h4
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp18
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--cpp/src/qpid/broker/SemanticState.h14
-rw-r--r--cpp/src/qpid/broker/TxBuffer.h32
-rw-r--r--cpp/src/qpid/ha/Primary.cpp10
-rw-r--r--cpp/src/qpid/ha/Primary.h4
-rw-r--r--cpp/src/qpid/ha/TxReplicator.h2
-rw-r--r--cpp/src/tests/DtxWorkRecordTest.cpp24
-rw-r--r--cpp/src/tests/TransactionObserverTest.cpp2
17 files changed, 76 insertions, 71 deletions
diff --git a/cpp/src/qpid/broker/BrokerObserver.h b/cpp/src/qpid/broker/BrokerObserver.h
index 2249d33e64..a9573b9e12 100644
--- a/cpp/src/qpid/broker/BrokerObserver.h
+++ b/cpp/src/qpid/broker/BrokerObserver.h
@@ -60,8 +60,8 @@ class BrokerObserver
const boost::shared_ptr<Queue>& ,
const std::string& /*key*/,
const framing::FieldTable& /*args*/) {}
- virtual void startTx(const boost::shared_ptr<TxBuffer>&) {}
- virtual void startDtx(const boost::shared_ptr<DtxBuffer>&) {}
+ virtual void startTx(const boost::intrusive_ptr<TxBuffer>&) {}
+ virtual void startDtx(const boost::intrusive_ptr<DtxBuffer>&) {}
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/BrokerObservers.h b/cpp/src/qpid/broker/BrokerObservers.h
index 5ba7bac890..8a9bdd4fd4 100644
--- a/cpp/src/qpid/broker/BrokerObservers.h
+++ b/cpp/src/qpid/broker/BrokerObservers.h
@@ -64,10 +64,10 @@ class BrokerObservers : public BrokerObserver,
each(boost::bind(
&BrokerObserver::unbind, _1, exchange, queue, key, args));
}
- void startTx(const boost::shared_ptr<TxBuffer>& tx) {
+ void startTx(const boost::intrusive_ptr<TxBuffer>& tx) {
each(boost::bind(&BrokerObserver::startTx, _1, tx));
}
- void startDtx(const boost::shared_ptr<DtxBuffer>& dtx) {
+ void startDtx(const boost::intrusive_ptr<DtxBuffer>& dtx) {
each(boost::bind(&BrokerObserver::startDtx, _1, dtx));
}
};
diff --git a/cpp/src/qpid/broker/DtxBuffer.h b/cpp/src/qpid/broker/DtxBuffer.h
index cabd37647a..21ca76c8f4 100644
--- a/cpp/src/qpid/broker/DtxBuffer.h
+++ b/cpp/src/qpid/broker/DtxBuffer.h
@@ -27,7 +27,8 @@
namespace qpid {
namespace broker {
-class DtxBuffer : public TxBuffer{
+class DtxBuffer : public TxBuffer {
+
mutable sys::Mutex lock;
const std::string xid;
bool ended;
@@ -36,8 +37,6 @@ class DtxBuffer : public TxBuffer{
bool expired;
public:
- typedef boost::shared_ptr<DtxBuffer> shared_ptr;
-
QPID_BROKER_EXTERN DtxBuffer(
const std::string& xid = "",
bool ended=false, bool suspended=false, bool failed=false, bool expired=false);
diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp
index 5233e07b2b..5ba1ce4dac 100644
--- a/cpp/src/qpid/broker/DtxManager.cpp
+++ b/cpp/src/qpid/broker/DtxManager.cpp
@@ -66,17 +66,17 @@ DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {}
DtxManager::~DtxManager() {}
-void DtxManager::start(const std::string& xid, DtxBuffer::shared_ptr ops)
+void DtxManager::start(const std::string& xid, boost::intrusive_ptr<DtxBuffer> ops)
{
createWork(xid)->add(ops);
}
-void DtxManager::join(const std::string& xid, DtxBuffer::shared_ptr ops)
+void DtxManager::join(const std::string& xid, boost::intrusive_ptr<DtxBuffer> ops)
{
getWork(xid)->add(ops);
}
-void DtxManager::recover(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops)
+void DtxManager::recover(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn, boost::intrusive_ptr<DtxBuffer> ops)
{
createWork(xid)->recover(txn, ops);
}
diff --git a/cpp/src/qpid/broker/DtxManager.h b/cpp/src/qpid/broker/DtxManager.h
index 81175e5dc3..000fc7b4b8 100644
--- a/cpp/src/qpid/broker/DtxManager.h
+++ b/cpp/src/qpid/broker/DtxManager.h
@@ -51,9 +51,9 @@ class DtxManager{
public:
DtxManager(sys::Timer&);
~DtxManager();
- void start(const std::string& xid, DtxBuffer::shared_ptr work);
- void join(const std::string& xid, DtxBuffer::shared_ptr work);
- void recover(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr work);
+ void start(const std::string& xid, boost::intrusive_ptr<DtxBuffer> work);
+ void join(const std::string& xid, boost::intrusive_ptr<DtxBuffer> work);
+ void recover(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn, boost::intrusive_ptr<DtxBuffer> work);
bool prepare(const std::string& xid);
bool commit(const std::string& xid, bool onePhase);
void rollback(const std::string& xid);
diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp
index ad02892895..04d19dc43e 100644
--- a/cpp/src/qpid/broker/DtxWorkRecord.cpp
+++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp
@@ -122,7 +122,7 @@ void DtxWorkRecord::rollback()
abort();
}
-void DtxWorkRecord::add(DtxBuffer::shared_ptr ops)
+void DtxWorkRecord::add(boost::intrusive_ptr<DtxBuffer> ops)
{
Mutex::ScopedLock locker(lock);
if (expired) {
@@ -162,7 +162,7 @@ void DtxWorkRecord::abort()
std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::rollback));
}
-void DtxWorkRecord::recover(std::auto_ptr<TPCTransactionContext> _txn, DtxBuffer::shared_ptr ops)
+void DtxWorkRecord::recover(std::auto_ptr<TPCTransactionContext> _txn, boost::intrusive_ptr<DtxBuffer> ops)
{
add(ops);
txn = _txn;
diff --git a/cpp/src/qpid/broker/DtxWorkRecord.h b/cpp/src/qpid/broker/DtxWorkRecord.h
index b38af907c5..91fda797d1 100644
--- a/cpp/src/qpid/broker/DtxWorkRecord.h
+++ b/cpp/src/qpid/broker/DtxWorkRecord.h
@@ -46,7 +46,7 @@ struct DtxTimeout;
*/
class DtxWorkRecord
{
- typedef std::vector<DtxBuffer::shared_ptr> Work;
+ typedef std::vector<boost::intrusive_ptr<DtxBuffer> >Work;
const std::string xid;
TransactionalStore* const store;
@@ -69,8 +69,8 @@ public:
QPID_BROKER_EXTERN bool prepare();
QPID_BROKER_EXTERN bool commit(bool onePhase);
QPID_BROKER_EXTERN void rollback();
- QPID_BROKER_EXTERN void add(DtxBuffer::shared_ptr ops);
- void recover(std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops);
+ QPID_BROKER_EXTERN void add(boost::intrusive_ptr<DtxBuffer> ops);
+ void recover(std::auto_ptr<TPCTransactionContext> txn, boost::intrusive_ptr<DtxBuffer> ops);
void timedout();
void setTimeout(boost::intrusive_ptr<DtxTimeout> t);
boost::intrusive_ptr<DtxTimeout> getTimeout();
diff --git a/cpp/src/qpid/broker/RecoverableMessageImpl.h b/cpp/src/qpid/broker/RecoverableMessageImpl.h
index 900eae4e1e..6f0bd8e4a9 100644
--- a/cpp/src/qpid/broker/RecoverableMessageImpl.h
+++ b/cpp/src/qpid/broker/RecoverableMessageImpl.h
@@ -41,8 +41,8 @@ public:
bool loadContent(uint64_t available);
void decodeContent(framing::Buffer& buffer);
void recover(boost::shared_ptr<Queue> queue);
- void enqueue(boost::shared_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue);
- void dequeue(boost::shared_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue);
+ void enqueue(boost::intrusive_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue);
+ void dequeue(boost::intrusive_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue);
Message getMessage();
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 4bfdfe76f4..19cb2f30c3 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -60,8 +60,8 @@ public:
const QueueSettings& getSettings() const;
void addArgument(const std::string& key, const types::Variant& value);
void recover(RecoverableMessage::shared_ptr msg);
- void enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg);
- void dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg);
+ void enqueue(boost::intrusive_ptr<DtxBuffer> buffer, RecoverableMessage::shared_ptr msg);
+ void dequeue(boost::intrusive_ptr<DtxBuffer> buffer, RecoverableMessage::shared_ptr msg);
};
@@ -88,9 +88,9 @@ public:
class RecoverableTransactionImpl : public RecoverableTransaction
{
- DtxBuffer::shared_ptr buffer;
+ boost::intrusive_ptr<DtxBuffer> buffer;
public:
- RecoverableTransactionImpl(DtxBuffer::shared_ptr _buffer) : buffer(_buffer) {}
+ RecoverableTransactionImpl(boost::intrusive_ptr<DtxBuffer> _buffer) : buffer(_buffer) {}
void enqueue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message);
void dequeue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message);
};
@@ -129,7 +129,7 @@ RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buff
RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
std::auto_ptr<TPCTransactionContext> txn)
{
- DtxBuffer::shared_ptr buffer(new DtxBuffer());
+ boost::intrusive_ptr<DtxBuffer> buffer(new DtxBuffer());
dtxMgr.recover(xid, txn, buffer);
return RecoverableTransaction::shared_ptr(new RecoverableTransactionImpl(buffer));
}
@@ -255,22 +255,22 @@ void RecoverableExchangeImpl::bind(const string& queueName,
queue->bound(exchange->getName(), key, args);
}
-void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
+void RecoverableMessageImpl::dequeue(boost::intrusive_ptr<DtxBuffer> buffer, Queue::shared_ptr queue)
{
buffer->enlist(TxOp::shared_ptr(new RecoveredDequeue(queue, msg)));
}
-void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue)
+void RecoverableMessageImpl::enqueue(boost::intrusive_ptr<DtxBuffer> buffer, Queue::shared_ptr queue)
{
buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg)));
}
-void RecoverableQueueImpl::dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr message)
+void RecoverableQueueImpl::dequeue(boost::intrusive_ptr<DtxBuffer> buffer, RecoverableMessage::shared_ptr message)
{
dynamic_pointer_cast<RecoverableMessageImpl>(message)->dequeue(buffer, queue);
}
-void RecoverableQueueImpl::enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr message)
+void RecoverableQueueImpl::enqueue(boost::intrusive_ptr<DtxBuffer> buffer, RecoverableMessage::shared_ptr message)
{
dynamic_pointer_cast<RecoverableMessageImpl>(message)->enqueue(buffer, queue);
}
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 18c29c09f9..469fe760a0 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -166,7 +166,7 @@ bool SemanticState::cancel(const string& tag)
void SemanticState::startTx()
{
- txBuffer = TxBuffer::shared_ptr(new TxBuffer());
+ txBuffer = boost::intrusive_ptr<TxBuffer>(new TxBuffer());
session.getBroker().getBrokerObservers().startTx(txBuffer);
session.startTx(); //just to update statistics
}
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 3105398426..4375a3f0f1 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -84,7 +84,7 @@ class SemanticState : private boost::noncopyable {
public:
typedef SemanticStateConsumerImpl ConsumerImpl;
- typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
+ typedef std::map<std::string, boost::intrusive_ptr<DtxBuffer> > DtxBufferMap;
private:
typedef std::map<std::string, boost::shared_ptr<ConsumerImpl> > ConsumerImplMap;
@@ -95,8 +95,8 @@ class SemanticState : private boost::noncopyable {
ConsumerImplMap consumers;
NameGenerator tagGenerator;
DeliveryRecords unacked;
- TxBuffer::shared_ptr txBuffer;
- DtxBuffer::shared_ptr dtxBuffer;
+ boost::intrusive_ptr<TxBuffer> txBuffer;
+ boost::intrusive_ptr<DtxBuffer> dtxBuffer;
bool dtxSelected;
DtxBufferMap suspendedXids;
framing::SequenceSet accumulatedAck;
@@ -179,10 +179,10 @@ class SemanticState : private boost::noncopyable {
DeliveryRecords& getUnacked() { return unacked; }
framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; }
- TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; }
- DtxBuffer::shared_ptr getDtxBuffer() const { return dtxBuffer; }
- void setTxBuffer(const TxBuffer::shared_ptr& txb) { txBuffer = txb; }
- void setDtxBuffer(const DtxBuffer::shared_ptr& dtxb) { dtxBuffer = dtxb; txBuffer = dtxb; }
+ boost::intrusive_ptr<TxBuffer> getTxBuffer() const { return txBuffer; }
+ boost::intrusive_ptr<DtxBuffer> getDtxBuffer() const { return dtxBuffer; }
+ void setTxBuffer(const boost::intrusive_ptr<TxBuffer>& txb) { txBuffer = txb; }
+ void setDtxBuffer(const boost::intrusive_ptr<DtxBuffer>& dtxb) { dtxBuffer = dtxb; txBuffer = dtxb; }
void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; }
void record(const DeliveryRecord& delivery);
DtxBufferMap& getSuspendedXids() { return suspendedXids; }
diff --git a/cpp/src/qpid/broker/TxBuffer.h b/cpp/src/qpid/broker/TxBuffer.h
index 4570c570a2..f4cdcb3ae2 100644
--- a/cpp/src/qpid/broker/TxBuffer.h
+++ b/cpp/src/qpid/broker/TxBuffer.h
@@ -1,3 +1,6 @@
+#ifndef QPID_BROKER_TXBUFFER_H
+#define QPID_BROKER_TXBUFFER_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,15 +21,19 @@
* under the License.
*
*/
-#ifndef _TxBuffer_
-#define _TxBuffer_
-#include <algorithm>
-#include <functional>
-#include <vector>
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/TransactionalStore.h"
#include "qpid/broker/TxOp.h"
+#include "qpid/broker/AsyncCompletion.h"
+#include <algorithm>
+#include <functional>
+#include <vector>
+
+
+namespace qpid {
+namespace broker {
+class TransactionObserver;
/**
* Represents a single transaction. As such, an instance of this class
@@ -56,21 +63,17 @@
* prepare. There is a little more flexibility with (2) but any
* changes made during prepare should be subject to the control of the
* TransactionalStore in use.
+ *
+ * TxBuffer inherits AsyncCompletion because transactions can be completed
+ * asynchronously if the broker is part of a HA cluster.
*/
-namespace qpid {
-
-namespace broker {
-class TransactionObserver;
-
-class TxBuffer {
+class TxBuffer : public AsyncCompletion {
private:
typedef std::vector<TxOp::shared_ptr>::iterator op_iterator;
std::vector<TxOp::shared_ptr> ops;
boost::shared_ptr<TransactionObserver> observer;
public:
- typedef boost::shared_ptr<TxBuffer> shared_ptr;
-
QPID_BROKER_EXTERN TxBuffer();
/**
@@ -128,4 +131,5 @@ class TxBuffer {
}} // namespace qpid::broker
-#endif
+
+#endif /*!QPID_BROKER_TXBUFFER_H*/
diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp
index e7fdc5035f..3a7ab3b0fc 100644
--- a/cpp/src/qpid/ha/Primary.cpp
+++ b/cpp/src/qpid/ha/Primary.cpp
@@ -40,12 +40,14 @@
#include "qpid/sys/Timer.h"
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
namespace qpid {
namespace ha {
using sys::Mutex;
using boost::shared_ptr;
+using boost::intrusive_ptr;
using namespace std;
using namespace framing;
@@ -69,8 +71,8 @@ class PrimaryBrokerObserver : public broker::BrokerObserver
void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); }
void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); }
void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); }
- void startTx(const shared_ptr<broker::TxBuffer>& tx) { primary.startTx(tx); }
- void startDtx(const shared_ptr<broker::DtxBuffer>& dtx) { primary.startDtx(dtx); }
+ void startTx(const intrusive_ptr<broker::TxBuffer>& tx) { primary.startTx(tx); }
+ void startDtx(const intrusive_ptr<broker::DtxBuffer>& dtx) { primary.startDtx(dtx); }
private:
Primary& primary;
@@ -406,11 +408,11 @@ shared_ptr<PrimaryTxObserver> Primary::makeTxObserver() {
return observer;
}
-void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) {
+void Primary::startTx(const boost::intrusive_ptr<broker::TxBuffer>& tx) {
tx->setObserver(makeTxObserver());
}
-void Primary::startDtx(const boost::shared_ptr<broker::DtxBuffer>& dtx) {
+void Primary::startDtx(const boost::intrusive_ptr<broker::DtxBuffer>& dtx) {
dtx->setObserver(makeTxObserver());
}
diff --git a/cpp/src/qpid/ha/Primary.h b/cpp/src/qpid/ha/Primary.h
index d1350ab261..97b7f956b7 100644
--- a/cpp/src/qpid/ha/Primary.h
+++ b/cpp/src/qpid/ha/Primary.h
@@ -93,8 +93,8 @@ class Primary : public Role
void queueDestroy(const QueuePtr&);
void exchangeCreate(const ExchangePtr&);
void exchangeDestroy(const ExchangePtr&);
- void startTx(const boost::shared_ptr<broker::TxBuffer>&);
- void startDtx(const boost::shared_ptr<broker::DtxBuffer>&);
+ void startTx(const boost::intrusive_ptr<broker::TxBuffer>&);
+ void startDtx(const boost::intrusive_ptr<broker::DtxBuffer>&);
// Called via ConnectionObserver
void opened(broker::Connection& connection);
diff --git a/cpp/src/qpid/ha/TxReplicator.h b/cpp/src/qpid/ha/TxReplicator.h
index 214b2a8a5f..bd80443e50 100644
--- a/cpp/src/qpid/ha/TxReplicator.h
+++ b/cpp/src/qpid/ha/TxReplicator.h
@@ -89,7 +89,7 @@ class TxReplicator : public QueueReplicator {
std::string logPrefix;
TxEnqueueEvent enq; // Enqueue data for next deliver.
- boost::shared_ptr<broker::TxBuffer> txBuffer;
+ boost::intrusive_ptr<broker::TxBuffer> txBuffer;
broker::MessageStore* store;
std::auto_ptr<broker::TransactionContext> context;
framing::ChannelId channel; // Channel to send prepare-complete.
diff --git a/cpp/src/tests/DtxWorkRecordTest.cpp b/cpp/src/tests/DtxWorkRecordTest.cpp
index 9d7666dca4..bcb3fc14a1 100644
--- a/cpp/src/tests/DtxWorkRecordTest.cpp
+++ b/cpp/src/tests/DtxWorkRecordTest.cpp
@@ -41,10 +41,10 @@ QPID_AUTO_TEST_CASE(testOnePhaseCommit){
MockTxOp::shared_ptr opB(new MockTxOp());
opB->expectPrepare().expectCommit();
- DtxBuffer::shared_ptr bufferA(new DtxBuffer());
+ boost::intrusive_ptr<DtxBuffer> bufferA(new DtxBuffer());
bufferA->enlist(static_pointer_cast<TxOp>(opA));
bufferA->markEnded();
- DtxBuffer::shared_ptr bufferB(new DtxBuffer());
+ boost::intrusive_ptr<DtxBuffer> bufferB(new DtxBuffer());
bufferB->enlist(static_pointer_cast<TxOp>(opB));
bufferB->markEnded();
@@ -71,13 +71,13 @@ QPID_AUTO_TEST_CASE(testFailOnOnePhaseCommit){
MockTxOp::shared_ptr opC(new MockTxOp());
opC->expectRollback();
- DtxBuffer::shared_ptr bufferA(new DtxBuffer());
+ boost::intrusive_ptr<DtxBuffer> bufferA(new DtxBuffer());
bufferA->enlist(static_pointer_cast<TxOp>(opA));
bufferA->markEnded();
- DtxBuffer::shared_ptr bufferB(new DtxBuffer());
+ boost::intrusive_ptr<DtxBuffer> bufferB(new DtxBuffer());
bufferB->enlist(static_pointer_cast<TxOp>(opB));
bufferB->markEnded();
- DtxBuffer::shared_ptr bufferC(new DtxBuffer());
+ boost::intrusive_ptr<DtxBuffer> bufferC(new DtxBuffer());
bufferC->enlist(static_pointer_cast<TxOp>(opC));
bufferC->markEnded();
@@ -105,10 +105,10 @@ QPID_AUTO_TEST_CASE(testTwoPhaseCommit){
MockTxOp::shared_ptr opB(new MockTxOp());
opB->expectPrepare().expectCommit();
- DtxBuffer::shared_ptr bufferA(new DtxBuffer());
+ boost::intrusive_ptr<DtxBuffer> bufferA(new DtxBuffer());
bufferA->enlist(static_pointer_cast<TxOp>(opA));
bufferA->markEnded();
- DtxBuffer::shared_ptr bufferB(new DtxBuffer());
+ boost::intrusive_ptr<DtxBuffer> bufferB(new DtxBuffer());
bufferB->enlist(static_pointer_cast<TxOp>(opB));
bufferB->markEnded();
@@ -136,13 +136,13 @@ QPID_AUTO_TEST_CASE(testFailOnTwoPhaseCommit){
MockTxOp::shared_ptr opC(new MockTxOp());
opC->expectRollback();
- DtxBuffer::shared_ptr bufferA(new DtxBuffer());
+ boost::intrusive_ptr<DtxBuffer> bufferA(new DtxBuffer());
bufferA->enlist(static_pointer_cast<TxOp>(opA));
bufferA->markEnded();
- DtxBuffer::shared_ptr bufferB(new DtxBuffer());
+ boost::intrusive_ptr<DtxBuffer> bufferB(new DtxBuffer());
bufferB->enlist(static_pointer_cast<TxOp>(opB));
bufferB->markEnded();
- DtxBuffer::shared_ptr bufferC(new DtxBuffer());
+ boost::intrusive_ptr<DtxBuffer> bufferC(new DtxBuffer());
bufferC->enlist(static_pointer_cast<TxOp>(opC));
bufferC->markEnded();
@@ -168,10 +168,10 @@ QPID_AUTO_TEST_CASE(testRollback){
MockTxOp::shared_ptr opB(new MockTxOp());
opB->expectPrepare().expectRollback();
- DtxBuffer::shared_ptr bufferA(new DtxBuffer());
+ boost::intrusive_ptr<DtxBuffer> bufferA(new DtxBuffer());
bufferA->enlist(static_pointer_cast<TxOp>(opA));
bufferA->markEnded();
- DtxBuffer::shared_ptr bufferB(new DtxBuffer());
+ boost::intrusive_ptr<DtxBuffer> bufferB(new DtxBuffer());
bufferB->enlist(static_pointer_cast<TxOp>(opB));
bufferB->markEnded();
diff --git a/cpp/src/tests/TransactionObserverTest.cpp b/cpp/src/tests/TransactionObserverTest.cpp
index fd1c331ae7..2a7d94b1ae 100644
--- a/cpp/src/tests/TransactionObserverTest.cpp
+++ b/cpp/src/tests/TransactionObserverTest.cpp
@@ -78,7 +78,7 @@ struct MockBrokerObserver : public BrokerObserver {
MockBrokerObserver(bool prep_=true) : prep(prep_) {}
- void startTx(const shared_ptr<TxBuffer>& buffer) {
+ void startTx(const intrusive_ptr<TxBuffer>& buffer) {
tx.reset(new MockTransactionObserver(prep));
buffer->setObserver(tx);
}