diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 31 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DtxBuffer.cpp | 18 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DtxBuffer.h | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DtxHandlerImpl.cpp | 41 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DtxWorkRecord.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DtxWorkRecord.h | 5 |
7 files changed, 88 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index e4ff098c8e..235f320cb7 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -26,6 +26,7 @@ #include <functional> #include <boost/bind.hpp> +#include <boost/format.hpp> #include "BrokerChannel.h" #include "qpid/framing/ChannelAdapter.h" @@ -121,12 +122,17 @@ void Channel::rollback(){ } void Channel::startDtx(const std::string& xid, DtxManager& mgr){ - dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer()); + dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid)); txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer); mgr.start(xid, dtxBuffer); } -void Channel::endDtx(){ +void Channel::endDtx(const std::string& xid){ + if (dtxBuffer->getXid() != xid) { + throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end") + % dtxBuffer->getXid() % xid); + } + TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); dtxBuffer->enlist(txAck); dtxBuffer->markEnded(); @@ -135,6 +141,27 @@ void Channel::endDtx(){ txBuffer.reset(); } +void Channel::suspendDtx(const std::string& xid){ + if (dtxBuffer->getXid() != xid) { + throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") + % dtxBuffer->getXid() % xid); + } + dtxBuffer->setSuspended(true); + txBuffer.reset(); +} + +void Channel::resumeDtx(const std::string& xid){ + if (dtxBuffer->getXid() != xid) { + throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume") + % dtxBuffer->getXid() % xid); + } + if (!dtxBuffer->isSuspended()) { + throw ConnectionException(503, boost::format("xid %1% not suspended")% xid); + } + dtxBuffer->setSuspended(true); + txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer); +} + void Channel::deliver( Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected) diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h index 4749ef6b5a..1d0093cf82 100644 --- a/cpp/src/qpid/broker/BrokerChannel.h +++ b/cpp/src/qpid/broker/BrokerChannel.h @@ -138,7 +138,9 @@ class Channel : public framing::ChannelAdapter, void commit(); void rollback(); void startDtx(const std::string& xid, DtxManager& mgr); - void endDtx(); + void endDtx(const std::string& xid); + void suspendDtx(const std::string& xid); + void resumeDtx(const std::string& xid); void ack(); void ack(uint64_t deliveryTag, bool multiple); void ack(uint64_t deliveryTag, uint64_t endTag); diff --git a/cpp/src/qpid/broker/DtxBuffer.cpp b/cpp/src/qpid/broker/DtxBuffer.cpp index bdc326593a..2ffe744293 100644 --- a/cpp/src/qpid/broker/DtxBuffer.cpp +++ b/cpp/src/qpid/broker/DtxBuffer.cpp @@ -23,7 +23,7 @@ using namespace qpid::broker; using qpid::sys::Mutex; -DtxBuffer::DtxBuffer() : ended(false) {} +DtxBuffer::DtxBuffer(const std::string& _xid) : xid(_xid), ended(false), suspended(false) {} DtxBuffer::~DtxBuffer() {} @@ -38,3 +38,19 @@ bool DtxBuffer::isEnded() Mutex::ScopedLock locker(lock); return ended; } + +void DtxBuffer::setSuspended(bool isSuspended) +{ + suspended = isSuspended; +} + +bool DtxBuffer::isSuspended() +{ + return suspended; +} + +const std::string& DtxBuffer::getXid() +{ + return xid; +} + diff --git a/cpp/src/qpid/broker/DtxBuffer.h b/cpp/src/qpid/broker/DtxBuffer.h index 15970ccff0..41be9309e8 100644 --- a/cpp/src/qpid/broker/DtxBuffer.h +++ b/cpp/src/qpid/broker/DtxBuffer.h @@ -28,14 +28,19 @@ namespace qpid { namespace broker { class DtxBuffer : public TxBuffer{ sys::Mutex lock; + const std::string xid; bool ended; + bool suspended; public: typedef boost::shared_ptr<DtxBuffer> shared_ptr; - DtxBuffer(); + DtxBuffer(const std::string& xid = ""); ~DtxBuffer(); void markEnded(); bool isEnded(); + void setSuspended(bool suspended); + bool isSuspended(); + const std::string& getXid(); }; } } diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp index 06b69bc20a..1c3fce9cdb 100644 --- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp +++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp @@ -17,6 +17,7 @@ */ #include "DtxHandlerImpl.h" +#include <boost/format.hpp> #include "Broker.h" #include "BrokerChannel.h" @@ -30,18 +31,6 @@ DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : CoreRefs(parent) {} // DtxDemarcationHandler: -void DtxHandlerImpl::end(const MethodContext& /*context*/, - u_int16_t /*ticket*/, - const string& /*xid*/, - bool /*fail*/, - bool /*suspend*/ ) -{ - channel.endDtx(); - //send end-ok - //TODO: handle fail and suspend - //TODO: check xid is as expected? -} - void DtxHandlerImpl::select(const MethodContext& /*context*/ ) { @@ -49,16 +38,38 @@ void DtxHandlerImpl::select(const MethodContext& /*context*/ ) //send select-ok } +void DtxHandlerImpl::end(const MethodContext& /*context*/, + u_int16_t /*ticket*/, + const string& xid, + bool fail, + bool suspend) +{ + if (fail && suspend) { + throw ConnectionException(503, "End and suspend cannot both be set."); + } + + //TODO: handle fail + if (suspend) { + channel.suspendDtx(xid); + } else { + channel.endDtx(xid); + } + //send end-ok +} void DtxHandlerImpl::start(const MethodContext& /*context*/, u_int16_t /*ticket*/, const string& xid, bool /*join*/, - bool /*resume*/ ) + bool resume) { - channel.startDtx(xid, broker.getDtxManager()); + //TODO: handle join + if (resume) { + channel.resumeDtx(xid); + } else { + channel.startDtx(xid, broker.getDtxManager()); + } //send start-ok - //TODO: handle join and resume } // DtxCoordinationHandler: diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp index 5e31312a8e..218131f6bc 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.cpp +++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp @@ -25,7 +25,7 @@ using boost::mem_fn; using namespace qpid::broker; -DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store) {} +DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store), completed(false) {} DtxWorkRecord::~DtxWorkRecord() {} @@ -65,6 +65,7 @@ void DtxWorkRecord::commit() std::auto_ptr<TransactionContext> localtxn = store->begin(); if (prepare(localtxn.get())) { store->commit(*localtxn); + for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit)); } else { store->abort(*localtxn); abort(); @@ -103,5 +104,4 @@ void DtxWorkRecord::abort() txn.reset(); } for_each(work.begin(), work.end(), mem_fn(&TxBuffer::rollback)); - } diff --git a/cpp/src/qpid/broker/DtxWorkRecord.h b/cpp/src/qpid/broker/DtxWorkRecord.h index 8ad4596963..18b41c7808 100644 --- a/cpp/src/qpid/broker/DtxWorkRecord.h +++ b/cpp/src/qpid/broker/DtxWorkRecord.h @@ -31,6 +31,11 @@ namespace qpid { namespace broker { +/** + * Represents the work done under a particular distributed transaction + * across potentially multiple channels. Identified by a xid. Allows + * that work to be prepared, committed and rolled-back. + */ class DtxWorkRecord { typedef std::vector<DtxBuffer::shared_ptr> Work; |
