summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp31
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.h4
-rw-r--r--cpp/src/qpid/broker/DtxBuffer.cpp18
-rw-r--r--cpp/src/qpid/broker/DtxBuffer.h7
-rw-r--r--cpp/src/qpid/broker/DtxHandlerImpl.cpp41
-rw-r--r--cpp/src/qpid/broker/DtxWorkRecord.cpp4
-rw-r--r--cpp/src/qpid/broker/DtxWorkRecord.h5
7 files changed, 88 insertions, 22 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index e4ff098c8e..235f320cb7 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -26,6 +26,7 @@
#include <functional>
#include <boost/bind.hpp>
+#include <boost/format.hpp>
#include "BrokerChannel.h"
#include "qpid/framing/ChannelAdapter.h"
@@ -121,12 +122,17 @@ void Channel::rollback(){
}
void Channel::startDtx(const std::string& xid, DtxManager& mgr){
- dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer());
+ dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
mgr.start(xid, dtxBuffer);
}
-void Channel::endDtx(){
+void Channel::endDtx(const std::string& xid){
+ if (dtxBuffer->getXid() != xid) {
+ throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end")
+ % dtxBuffer->getXid() % xid);
+ }
+
TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
dtxBuffer->enlist(txAck);
dtxBuffer->markEnded();
@@ -135,6 +141,27 @@ void Channel::endDtx(){
txBuffer.reset();
}
+void Channel::suspendDtx(const std::string& xid){
+ if (dtxBuffer->getXid() != xid) {
+ throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend")
+ % dtxBuffer->getXid() % xid);
+ }
+ dtxBuffer->setSuspended(true);
+ txBuffer.reset();
+}
+
+void Channel::resumeDtx(const std::string& xid){
+ if (dtxBuffer->getXid() != xid) {
+ throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume")
+ % dtxBuffer->getXid() % xid);
+ }
+ if (!dtxBuffer->isSuspended()) {
+ throw ConnectionException(503, boost::format("xid %1% not suspended")% xid);
+ }
+ dtxBuffer->setSuspended(true);
+ txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
+}
+
void Channel::deliver(
Message::shared_ptr& msg, const string& consumerTag,
Queue::shared_ptr& queue, bool ackExpected)
diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h
index 4749ef6b5a..1d0093cf82 100644
--- a/cpp/src/qpid/broker/BrokerChannel.h
+++ b/cpp/src/qpid/broker/BrokerChannel.h
@@ -138,7 +138,9 @@ class Channel : public framing::ChannelAdapter,
void commit();
void rollback();
void startDtx(const std::string& xid, DtxManager& mgr);
- void endDtx();
+ void endDtx(const std::string& xid);
+ void suspendDtx(const std::string& xid);
+ void resumeDtx(const std::string& xid);
void ack();
void ack(uint64_t deliveryTag, bool multiple);
void ack(uint64_t deliveryTag, uint64_t endTag);
diff --git a/cpp/src/qpid/broker/DtxBuffer.cpp b/cpp/src/qpid/broker/DtxBuffer.cpp
index bdc326593a..2ffe744293 100644
--- a/cpp/src/qpid/broker/DtxBuffer.cpp
+++ b/cpp/src/qpid/broker/DtxBuffer.cpp
@@ -23,7 +23,7 @@
using namespace qpid::broker;
using qpid::sys::Mutex;
-DtxBuffer::DtxBuffer() : ended(false) {}
+DtxBuffer::DtxBuffer(const std::string& _xid) : xid(_xid), ended(false), suspended(false) {}
DtxBuffer::~DtxBuffer() {}
@@ -38,3 +38,19 @@ bool DtxBuffer::isEnded()
Mutex::ScopedLock locker(lock);
return ended;
}
+
+void DtxBuffer::setSuspended(bool isSuspended)
+{
+ suspended = isSuspended;
+}
+
+bool DtxBuffer::isSuspended()
+{
+ return suspended;
+}
+
+const std::string& DtxBuffer::getXid()
+{
+ return xid;
+}
+
diff --git a/cpp/src/qpid/broker/DtxBuffer.h b/cpp/src/qpid/broker/DtxBuffer.h
index 15970ccff0..41be9309e8 100644
--- a/cpp/src/qpid/broker/DtxBuffer.h
+++ b/cpp/src/qpid/broker/DtxBuffer.h
@@ -28,14 +28,19 @@ namespace qpid {
namespace broker {
class DtxBuffer : public TxBuffer{
sys::Mutex lock;
+ const std::string xid;
bool ended;
+ bool suspended;
public:
typedef boost::shared_ptr<DtxBuffer> shared_ptr;
- DtxBuffer();
+ DtxBuffer(const std::string& xid = "");
~DtxBuffer();
void markEnded();
bool isEnded();
+ void setSuspended(bool suspended);
+ bool isSuspended();
+ const std::string& getXid();
};
}
}
diff --git a/cpp/src/qpid/broker/DtxHandlerImpl.cpp b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
index 06b69bc20a..1c3fce9cdb 100644
--- a/cpp/src/qpid/broker/DtxHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/DtxHandlerImpl.cpp
@@ -17,6 +17,7 @@
*/
#include "DtxHandlerImpl.h"
+#include <boost/format.hpp>
#include "Broker.h"
#include "BrokerChannel.h"
@@ -30,18 +31,6 @@ DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : CoreRefs(parent) {}
// DtxDemarcationHandler:
-void DtxHandlerImpl::end(const MethodContext& /*context*/,
- u_int16_t /*ticket*/,
- const string& /*xid*/,
- bool /*fail*/,
- bool /*suspend*/ )
-{
- channel.endDtx();
- //send end-ok
- //TODO: handle fail and suspend
- //TODO: check xid is as expected?
-}
-
void DtxHandlerImpl::select(const MethodContext& /*context*/ )
{
@@ -49,16 +38,38 @@ void DtxHandlerImpl::select(const MethodContext& /*context*/ )
//send select-ok
}
+void DtxHandlerImpl::end(const MethodContext& /*context*/,
+ u_int16_t /*ticket*/,
+ const string& xid,
+ bool fail,
+ bool suspend)
+{
+ if (fail && suspend) {
+ throw ConnectionException(503, "End and suspend cannot both be set.");
+ }
+
+ //TODO: handle fail
+ if (suspend) {
+ channel.suspendDtx(xid);
+ } else {
+ channel.endDtx(xid);
+ }
+ //send end-ok
+}
void DtxHandlerImpl::start(const MethodContext& /*context*/,
u_int16_t /*ticket*/,
const string& xid,
bool /*join*/,
- bool /*resume*/ )
+ bool resume)
{
- channel.startDtx(xid, broker.getDtxManager());
+ //TODO: handle join
+ if (resume) {
+ channel.resumeDtx(xid);
+ } else {
+ channel.startDtx(xid, broker.getDtxManager());
+ }
//send start-ok
- //TODO: handle join and resume
}
// DtxCoordinationHandler:
diff --git a/cpp/src/qpid/broker/DtxWorkRecord.cpp b/cpp/src/qpid/broker/DtxWorkRecord.cpp
index 5e31312a8e..218131f6bc 100644
--- a/cpp/src/qpid/broker/DtxWorkRecord.cpp
+++ b/cpp/src/qpid/broker/DtxWorkRecord.cpp
@@ -25,7 +25,7 @@ using boost::mem_fn;
using namespace qpid::broker;
-DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store) {}
+DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store), completed(false) {}
DtxWorkRecord::~DtxWorkRecord() {}
@@ -65,6 +65,7 @@ void DtxWorkRecord::commit()
std::auto_ptr<TransactionContext> localtxn = store->begin();
if (prepare(localtxn.get())) {
store->commit(*localtxn);
+ for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
} else {
store->abort(*localtxn);
abort();
@@ -103,5 +104,4 @@ void DtxWorkRecord::abort()
txn.reset();
}
for_each(work.begin(), work.end(), mem_fn(&TxBuffer::rollback));
-
}
diff --git a/cpp/src/qpid/broker/DtxWorkRecord.h b/cpp/src/qpid/broker/DtxWorkRecord.h
index 8ad4596963..18b41c7808 100644
--- a/cpp/src/qpid/broker/DtxWorkRecord.h
+++ b/cpp/src/qpid/broker/DtxWorkRecord.h
@@ -31,6 +31,11 @@
namespace qpid {
namespace broker {
+/**
+ * Represents the work done under a particular distributed transaction
+ * across potentially multiple channels. Identified by a xid. Allows
+ * that work to be prepared, committed and rolled-back.
+ */
class DtxWorkRecord
{
typedef std::vector<DtxBuffer::shared_ptr> Work;