diff options
author | Alan Conway <aconway@apache.org> | 2013-10-29 15:23:36 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2013-10-29 15:23:36 +0000 |
commit | 4a3eb469d5eb8c97c719c70e42814cd703e12fbd (patch) | |
tree | 5fefbf7b5c2f327275c8bd929ed83281ec54114d | |
parent | fd23db50312860de7585001588fae44b4f8e9480 (diff) | |
download | qpid-python-4a3eb469d5eb8c97c719c70e42814cd703e12fbd.tar.gz |
QPID-5139: Make TxBuffer inherit from AsyncCompletion.
Switched from shared_ptr to intrusive_ptr for TxBuffer and DtxBuffer.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1536752 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/broker/BrokerObserver.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/BrokerObservers.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxBuffer.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxManager.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxManager.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxWorkRecord.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DtxWorkRecord.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoverableMessageImpl.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/RecoveryManagerImpl.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 14 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TxBuffer.h | 32 | ||||
-rw-r--r-- | cpp/src/qpid/ha/Primary.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/ha/Primary.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/ha/TxReplicator.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/DtxWorkRecordTest.cpp | 24 | ||||
-rw-r--r-- | cpp/src/tests/TransactionObserverTest.cpp | 2 |
17 files changed, 76 insertions, 71 deletions
diff --git a/cpp/src/qpid/broker/BrokerObserver.h b/cpp/src/qpid/broker/BrokerObserver.h index 2249d33e64..a9573b9e12 100644 --- a/cpp/src/qpid/broker/BrokerObserver.h +++ b/cpp/src/qpid/broker/BrokerObserver.h @@ -60,8 +60,8 @@ class BrokerObserver const boost::shared_ptr<Queue>& , const std::string& /*key*/, const framing::FieldTable& /*args*/) {} - virtual void startTx(const boost::shared_ptr<TxBuffer>&) {} - virtual void startDtx(const boost::shared_ptr<DtxBuffer>&) {} + virtual void startTx(const boost::intrusive_ptr<TxBuffer>&) {} + virtual void startDtx(const boost::intrusive_ptr<DtxBuffer>&) {} }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/BrokerObservers.h b/cpp/src/qpid/broker/BrokerObservers.h index 5ba7bac890..8a9bdd4fd4 100644 --- a/cpp/src/qpid/broker/BrokerObservers.h +++ b/cpp/src/qpid/broker/BrokerObservers.h @@ -64,10 +64,10 @@ class BrokerObservers : public BrokerObserver, each(boost::bind( &BrokerObserver::unbind, _1, exchange, queue, key, args)); } - void startTx(const boost::shared_ptr<TxBuffer>& tx) { + void startTx(const boost::intrusive_ptr<TxBuffer>& tx) { each(boost::bind(&BrokerObserver::startTx, _1, tx)); } - void startDtx(const boost::shared_ptr<DtxBuffer>& dtx) { + void startDtx(const boost::intrusive_ptr<DtxBuffer>& dtx) { each(boost::bind(&BrokerObserver::startDtx, _1, dtx)); } }; diff --git a/cpp/src/qpid/broker/DtxBuffer.h b/cpp/src/qpid/broker/DtxBuffer.h index cabd37647a..21ca76c8f4 100644 --- a/cpp/src/qpid/broker/DtxBuffer.h +++ b/cpp/src/qpid/broker/DtxBuffer.h @@ -27,7 +27,8 @@ namespace qpid { namespace broker { -class DtxBuffer : public TxBuffer{ +class DtxBuffer : public TxBuffer { + mutable sys::Mutex lock; const std::string xid; bool ended; @@ -36,8 +37,6 @@ class DtxBuffer : public TxBuffer{ bool expired; public: - typedef boost::shared_ptr<DtxBuffer> shared_ptr; - QPID_BROKER_EXTERN DtxBuffer( const std::string& xid = "", bool ended=false, bool suspended=false, bool failed=false, bool expired=false); diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index 5233e07b2b..5ba1ce4dac 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -66,17 +66,17 @@ DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {} DtxManager::~DtxManager() {} -void DtxManager::start(const std::string& xid, DtxBuffer::shared_ptr ops) +void DtxManager::start(const std::string& xid, boost::intrusive_ptr<DtxBuffer> ops) { createWork(xid)->add(ops); } -void DtxManager::join(const std::string& xid, DtxBuffer::shared_ptr ops) +void DtxManager::join(const std::string& xid, boost::intrusive_ptr<DtxBuffer> ops) { getWork(xid)->add(ops); } -void DtxManager::recover(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops) +void DtxManager::recover(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn, boost::intrusive_ptr<DtxBuffer> ops) { createWork(xid)->recover(txn, ops); } diff --git a/cpp/src/qpid/broker/DtxManager.h b/cpp/src/qpid/broker/DtxManager.h index 81175e5dc3..000fc7b4b8 100644 --- a/cpp/src/qpid/broker/DtxManager.h +++ b/cpp/src/qpid/broker/DtxManager.h @@ -51,9 +51,9 @@ class DtxManager{ public: DtxManager(sys::Timer&); ~DtxManager(); - void start(const std::string& xid, DtxBuffer::shared_ptr work); - void join(const std::string& xid, DtxBuffer::shared_ptr work); - void recover(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr work); + void start(const std::string& xid, boost::intrusive_ptr<DtxBuffer> work); + void join(const std::string& xid, boost::intrusive_ptr<DtxBuffer> work); + void recover(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn, boost::intrusive_ptr<DtxBuffer> work); bool prepare(const std::string& xid); bool commit(const std::string& xid, bool onePhase); void rollback(const std::string& xid); diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp index ad02892895..04d19dc43e 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.cpp +++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp @@ -122,7 +122,7 @@ void DtxWorkRecord::rollback() abort(); } -void DtxWorkRecord::add(DtxBuffer::shared_ptr ops) +void DtxWorkRecord::add(boost::intrusive_ptr<DtxBuffer> ops) { Mutex::ScopedLock locker(lock); if (expired) { @@ -162,7 +162,7 @@ void DtxWorkRecord::abort() std::for_each(work.begin(), work.end(), mem_fn(&TxBuffer::rollback)); } -void DtxWorkRecord::recover(std::auto_ptr<TPCTransactionContext> _txn, DtxBuffer::shared_ptr ops) +void DtxWorkRecord::recover(std::auto_ptr<TPCTransactionContext> _txn, boost::intrusive_ptr<DtxBuffer> ops) { add(ops); txn = _txn; diff --git a/cpp/src/qpid/broker/DtxWorkRecord.h b/cpp/src/qpid/broker/DtxWorkRecord.h index b38af907c5..91fda797d1 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.h +++ b/cpp/src/qpid/broker/DtxWorkRecord.h @@ -46,7 +46,7 @@ struct DtxTimeout; */ class DtxWorkRecord { - typedef std::vector<DtxBuffer::shared_ptr> Work; + typedef std::vector<boost::intrusive_ptr<DtxBuffer> >Work; const std::string xid; TransactionalStore* const store; @@ -69,8 +69,8 @@ public: QPID_BROKER_EXTERN bool prepare(); QPID_BROKER_EXTERN bool commit(bool onePhase); QPID_BROKER_EXTERN void rollback(); - QPID_BROKER_EXTERN void add(DtxBuffer::shared_ptr ops); - void recover(std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops); + QPID_BROKER_EXTERN void add(boost::intrusive_ptr<DtxBuffer> ops); + void recover(std::auto_ptr<TPCTransactionContext> txn, boost::intrusive_ptr<DtxBuffer> ops); void timedout(); void setTimeout(boost::intrusive_ptr<DtxTimeout> t); boost::intrusive_ptr<DtxTimeout> getTimeout(); diff --git a/cpp/src/qpid/broker/RecoverableMessageImpl.h b/cpp/src/qpid/broker/RecoverableMessageImpl.h index 900eae4e1e..6f0bd8e4a9 100644 --- a/cpp/src/qpid/broker/RecoverableMessageImpl.h +++ b/cpp/src/qpid/broker/RecoverableMessageImpl.h @@ -41,8 +41,8 @@ public: bool loadContent(uint64_t available); void decodeContent(framing::Buffer& buffer); void recover(boost::shared_ptr<Queue> queue); - void enqueue(boost::shared_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue); - void dequeue(boost::shared_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue); + void enqueue(boost::intrusive_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue); + void dequeue(boost::intrusive_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue); Message getMessage(); }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index 4bfdfe76f4..19cb2f30c3 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -60,8 +60,8 @@ public: const QueueSettings& getSettings() const; void addArgument(const std::string& key, const types::Variant& value); void recover(RecoverableMessage::shared_ptr msg); - void enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg); - void dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr msg); + void enqueue(boost::intrusive_ptr<DtxBuffer> buffer, RecoverableMessage::shared_ptr msg); + void dequeue(boost::intrusive_ptr<DtxBuffer> buffer, RecoverableMessage::shared_ptr msg); }; @@ -88,9 +88,9 @@ public: class RecoverableTransactionImpl : public RecoverableTransaction { - DtxBuffer::shared_ptr buffer; + boost::intrusive_ptr<DtxBuffer> buffer; public: - RecoverableTransactionImpl(DtxBuffer::shared_ptr _buffer) : buffer(_buffer) {} + RecoverableTransactionImpl(boost::intrusive_ptr<DtxBuffer> _buffer) : buffer(_buffer) {} void enqueue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message); void dequeue(RecoverableQueue::shared_ptr queue, RecoverableMessage::shared_ptr message); }; @@ -129,7 +129,7 @@ RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buff RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn) { - DtxBuffer::shared_ptr buffer(new DtxBuffer()); + boost::intrusive_ptr<DtxBuffer> buffer(new DtxBuffer()); dtxMgr.recover(xid, txn, buffer); return RecoverableTransaction::shared_ptr(new RecoverableTransactionImpl(buffer)); } @@ -255,22 +255,22 @@ void RecoverableExchangeImpl::bind(const string& queueName, queue->bound(exchange->getName(), key, args); } -void RecoverableMessageImpl::dequeue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue) +void RecoverableMessageImpl::dequeue(boost::intrusive_ptr<DtxBuffer> buffer, Queue::shared_ptr queue) { buffer->enlist(TxOp::shared_ptr(new RecoveredDequeue(queue, msg))); } -void RecoverableMessageImpl::enqueue(DtxBuffer::shared_ptr buffer, Queue::shared_ptr queue) +void RecoverableMessageImpl::enqueue(boost::intrusive_ptr<DtxBuffer> buffer, Queue::shared_ptr queue) { buffer->enlist(TxOp::shared_ptr(new RecoveredEnqueue(queue, msg))); } -void RecoverableQueueImpl::dequeue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr message) +void RecoverableQueueImpl::dequeue(boost::intrusive_ptr<DtxBuffer> buffer, RecoverableMessage::shared_ptr message) { dynamic_pointer_cast<RecoverableMessageImpl>(message)->dequeue(buffer, queue); } -void RecoverableQueueImpl::enqueue(DtxBuffer::shared_ptr buffer, RecoverableMessage::shared_ptr message) +void RecoverableQueueImpl::enqueue(boost::intrusive_ptr<DtxBuffer> buffer, RecoverableMessage::shared_ptr message) { dynamic_pointer_cast<RecoverableMessageImpl>(message)->enqueue(buffer, queue); } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 18c29c09f9..469fe760a0 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -166,7 +166,7 @@ bool SemanticState::cancel(const string& tag) void SemanticState::startTx() { - txBuffer = TxBuffer::shared_ptr(new TxBuffer()); + txBuffer = boost::intrusive_ptr<TxBuffer>(new TxBuffer()); session.getBroker().getBrokerObservers().startTx(txBuffer); session.startTx(); //just to update statistics } diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 3105398426..4375a3f0f1 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -84,7 +84,7 @@ class SemanticState : private boost::noncopyable { public: typedef SemanticStateConsumerImpl ConsumerImpl; - typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap; + typedef std::map<std::string, boost::intrusive_ptr<DtxBuffer> > DtxBufferMap; private: typedef std::map<std::string, boost::shared_ptr<ConsumerImpl> > ConsumerImplMap; @@ -95,8 +95,8 @@ class SemanticState : private boost::noncopyable { ConsumerImplMap consumers; NameGenerator tagGenerator; DeliveryRecords unacked; - TxBuffer::shared_ptr txBuffer; - DtxBuffer::shared_ptr dtxBuffer; + boost::intrusive_ptr<TxBuffer> txBuffer; + boost::intrusive_ptr<DtxBuffer> dtxBuffer; bool dtxSelected; DtxBufferMap suspendedXids; framing::SequenceSet accumulatedAck; @@ -179,10 +179,10 @@ class SemanticState : private boost::noncopyable { DeliveryRecords& getUnacked() { return unacked; } framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; } - TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; } - DtxBuffer::shared_ptr getDtxBuffer() const { return dtxBuffer; } - void setTxBuffer(const TxBuffer::shared_ptr& txb) { txBuffer = txb; } - void setDtxBuffer(const DtxBuffer::shared_ptr& dtxb) { dtxBuffer = dtxb; txBuffer = dtxb; } + boost::intrusive_ptr<TxBuffer> getTxBuffer() const { return txBuffer; } + boost::intrusive_ptr<DtxBuffer> getDtxBuffer() const { return dtxBuffer; } + void setTxBuffer(const boost::intrusive_ptr<TxBuffer>& txb) { txBuffer = txb; } + void setDtxBuffer(const boost::intrusive_ptr<DtxBuffer>& dtxb) { dtxBuffer = dtxb; txBuffer = dtxb; } void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; } void record(const DeliveryRecord& delivery); DtxBufferMap& getSuspendedXids() { return suspendedXids; } diff --git a/cpp/src/qpid/broker/TxBuffer.h b/cpp/src/qpid/broker/TxBuffer.h index 4570c570a2..f4cdcb3ae2 100644 --- a/cpp/src/qpid/broker/TxBuffer.h +++ b/cpp/src/qpid/broker/TxBuffer.h @@ -1,3 +1,6 @@ +#ifndef QPID_BROKER_TXBUFFER_H +#define QPID_BROKER_TXBUFFER_H + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -18,15 +21,19 @@ * under the License. * */ -#ifndef _TxBuffer_ -#define _TxBuffer_ -#include <algorithm> -#include <functional> -#include <vector> #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/TransactionalStore.h" #include "qpid/broker/TxOp.h" +#include "qpid/broker/AsyncCompletion.h" +#include <algorithm> +#include <functional> +#include <vector> + + +namespace qpid { +namespace broker { +class TransactionObserver; /** * Represents a single transaction. As such, an instance of this class @@ -56,21 +63,17 @@ * prepare. There is a little more flexibility with (2) but any * changes made during prepare should be subject to the control of the * TransactionalStore in use. + * + * TxBuffer inherits AsyncCompletion because transactions can be completed + * asynchronously if the broker is part of a HA cluster. */ -namespace qpid { - -namespace broker { -class TransactionObserver; - -class TxBuffer { +class TxBuffer : public AsyncCompletion { private: typedef std::vector<TxOp::shared_ptr>::iterator op_iterator; std::vector<TxOp::shared_ptr> ops; boost::shared_ptr<TransactionObserver> observer; public: - typedef boost::shared_ptr<TxBuffer> shared_ptr; - QPID_BROKER_EXTERN TxBuffer(); /** @@ -128,4 +131,5 @@ class TxBuffer { }} // namespace qpid::broker -#endif + +#endif /*!QPID_BROKER_TXBUFFER_H*/ diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp index e7fdc5035f..3a7ab3b0fc 100644 --- a/cpp/src/qpid/ha/Primary.cpp +++ b/cpp/src/qpid/ha/Primary.cpp @@ -40,12 +40,14 @@ #include "qpid/sys/Timer.h" #include <boost/bind.hpp> #include <boost/shared_ptr.hpp> +#include <boost/intrusive_ptr.hpp> namespace qpid { namespace ha { using sys::Mutex; using boost::shared_ptr; +using boost::intrusive_ptr; using namespace std; using namespace framing; @@ -69,8 +71,8 @@ class PrimaryBrokerObserver : public broker::BrokerObserver void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); } void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); } void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); } - void startTx(const shared_ptr<broker::TxBuffer>& tx) { primary.startTx(tx); } - void startDtx(const shared_ptr<broker::DtxBuffer>& dtx) { primary.startDtx(dtx); } + void startTx(const intrusive_ptr<broker::TxBuffer>& tx) { primary.startTx(tx); } + void startDtx(const intrusive_ptr<broker::DtxBuffer>& dtx) { primary.startDtx(dtx); } private: Primary& primary; @@ -406,11 +408,11 @@ shared_ptr<PrimaryTxObserver> Primary::makeTxObserver() { return observer; } -void Primary::startTx(const boost::shared_ptr<broker::TxBuffer>& tx) { +void Primary::startTx(const boost::intrusive_ptr<broker::TxBuffer>& tx) { tx->setObserver(makeTxObserver()); } -void Primary::startDtx(const boost::shared_ptr<broker::DtxBuffer>& dtx) { +void Primary::startDtx(const boost::intrusive_ptr<broker::DtxBuffer>& dtx) { dtx->setObserver(makeTxObserver()); } diff --git a/cpp/src/qpid/ha/Primary.h b/cpp/src/qpid/ha/Primary.h index d1350ab261..97b7f956b7 100644 --- a/cpp/src/qpid/ha/Primary.h +++ b/cpp/src/qpid/ha/Primary.h @@ -93,8 +93,8 @@ class Primary : public Role void queueDestroy(const QueuePtr&); void exchangeCreate(const ExchangePtr&); void exchangeDestroy(const ExchangePtr&); - void startTx(const boost::shared_ptr<broker::TxBuffer>&); - void startDtx(const boost::shared_ptr<broker::DtxBuffer>&); + void startTx(const boost::intrusive_ptr<broker::TxBuffer>&); + void startDtx(const boost::intrusive_ptr<broker::DtxBuffer>&); // Called via ConnectionObserver void opened(broker::Connection& connection); diff --git a/cpp/src/qpid/ha/TxReplicator.h b/cpp/src/qpid/ha/TxReplicator.h index 214b2a8a5f..bd80443e50 100644 --- a/cpp/src/qpid/ha/TxReplicator.h +++ b/cpp/src/qpid/ha/TxReplicator.h @@ -89,7 +89,7 @@ class TxReplicator : public QueueReplicator { std::string logPrefix; TxEnqueueEvent enq; // Enqueue data for next deliver. - boost::shared_ptr<broker::TxBuffer> txBuffer; + boost::intrusive_ptr<broker::TxBuffer> txBuffer; broker::MessageStore* store; std::auto_ptr<broker::TransactionContext> context; framing::ChannelId channel; // Channel to send prepare-complete. diff --git a/cpp/src/tests/DtxWorkRecordTest.cpp b/cpp/src/tests/DtxWorkRecordTest.cpp index 9d7666dca4..bcb3fc14a1 100644 --- a/cpp/src/tests/DtxWorkRecordTest.cpp +++ b/cpp/src/tests/DtxWorkRecordTest.cpp @@ -41,10 +41,10 @@ QPID_AUTO_TEST_CASE(testOnePhaseCommit){ MockTxOp::shared_ptr opB(new MockTxOp()); opB->expectPrepare().expectCommit(); - DtxBuffer::shared_ptr bufferA(new DtxBuffer()); + boost::intrusive_ptr<DtxBuffer> bufferA(new DtxBuffer()); bufferA->enlist(static_pointer_cast<TxOp>(opA)); bufferA->markEnded(); - DtxBuffer::shared_ptr bufferB(new DtxBuffer()); + boost::intrusive_ptr<DtxBuffer> bufferB(new DtxBuffer()); bufferB->enlist(static_pointer_cast<TxOp>(opB)); bufferB->markEnded(); @@ -71,13 +71,13 @@ QPID_AUTO_TEST_CASE(testFailOnOnePhaseCommit){ MockTxOp::shared_ptr opC(new MockTxOp()); opC->expectRollback(); - DtxBuffer::shared_ptr bufferA(new DtxBuffer()); + boost::intrusive_ptr<DtxBuffer> bufferA(new DtxBuffer()); bufferA->enlist(static_pointer_cast<TxOp>(opA)); bufferA->markEnded(); - DtxBuffer::shared_ptr bufferB(new DtxBuffer()); + boost::intrusive_ptr<DtxBuffer> bufferB(new DtxBuffer()); bufferB->enlist(static_pointer_cast<TxOp>(opB)); bufferB->markEnded(); - DtxBuffer::shared_ptr bufferC(new DtxBuffer()); + boost::intrusive_ptr<DtxBuffer> bufferC(new DtxBuffer()); bufferC->enlist(static_pointer_cast<TxOp>(opC)); bufferC->markEnded(); @@ -105,10 +105,10 @@ QPID_AUTO_TEST_CASE(testTwoPhaseCommit){ MockTxOp::shared_ptr opB(new MockTxOp()); opB->expectPrepare().expectCommit(); - DtxBuffer::shared_ptr bufferA(new DtxBuffer()); + boost::intrusive_ptr<DtxBuffer> bufferA(new DtxBuffer()); bufferA->enlist(static_pointer_cast<TxOp>(opA)); bufferA->markEnded(); - DtxBuffer::shared_ptr bufferB(new DtxBuffer()); + boost::intrusive_ptr<DtxBuffer> bufferB(new DtxBuffer()); bufferB->enlist(static_pointer_cast<TxOp>(opB)); bufferB->markEnded(); @@ -136,13 +136,13 @@ QPID_AUTO_TEST_CASE(testFailOnTwoPhaseCommit){ MockTxOp::shared_ptr opC(new MockTxOp()); opC->expectRollback(); - DtxBuffer::shared_ptr bufferA(new DtxBuffer()); + boost::intrusive_ptr<DtxBuffer> bufferA(new DtxBuffer()); bufferA->enlist(static_pointer_cast<TxOp>(opA)); bufferA->markEnded(); - DtxBuffer::shared_ptr bufferB(new DtxBuffer()); + boost::intrusive_ptr<DtxBuffer> bufferB(new DtxBuffer()); bufferB->enlist(static_pointer_cast<TxOp>(opB)); bufferB->markEnded(); - DtxBuffer::shared_ptr bufferC(new DtxBuffer()); + boost::intrusive_ptr<DtxBuffer> bufferC(new DtxBuffer()); bufferC->enlist(static_pointer_cast<TxOp>(opC)); bufferC->markEnded(); @@ -168,10 +168,10 @@ QPID_AUTO_TEST_CASE(testRollback){ MockTxOp::shared_ptr opB(new MockTxOp()); opB->expectPrepare().expectRollback(); - DtxBuffer::shared_ptr bufferA(new DtxBuffer()); + boost::intrusive_ptr<DtxBuffer> bufferA(new DtxBuffer()); bufferA->enlist(static_pointer_cast<TxOp>(opA)); bufferA->markEnded(); - DtxBuffer::shared_ptr bufferB(new DtxBuffer()); + boost::intrusive_ptr<DtxBuffer> bufferB(new DtxBuffer()); bufferB->enlist(static_pointer_cast<TxOp>(opB)); bufferB->markEnded(); diff --git a/cpp/src/tests/TransactionObserverTest.cpp b/cpp/src/tests/TransactionObserverTest.cpp index fd1c331ae7..2a7d94b1ae 100644 --- a/cpp/src/tests/TransactionObserverTest.cpp +++ b/cpp/src/tests/TransactionObserverTest.cpp @@ -78,7 +78,7 @@ struct MockBrokerObserver : public BrokerObserver { MockBrokerObserver(bool prep_=true) : prep(prep_) {} - void startTx(const shared_ptr<TxBuffer>& buffer) { + void startTx(const intrusive_ptr<TxBuffer>& buffer) { tx.reset(new MockTransactionObserver(prep)); buffer->setObserver(tx); } |