summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp41
1 files changed, 32 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index 096478faad..0c06350c02 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -61,6 +61,7 @@ Channel::Channel(
prefetchCount(0),
framesize(_framesize),
tagGenerator("sgen"),
+ dtxSelected(false),
accumulatedAck(0),
store(_store),
messageBuilder(this, _store, _stagingThreshold),
@@ -103,6 +104,9 @@ void Channel::cancel(const string& tag){
void Channel::close(){
opened = false;
consumers.clear();
+ if (dtxBuffer.get()) {
+ dtxBuffer->fail();
+ }
recover(true);
}
@@ -123,22 +127,41 @@ void Channel::rollback(){
accumulatedAck.clear();
}
-void Channel::startDtx(const std::string& xid, DtxManager& mgr){
+void Channel::selectDtx(){
+ dtxSelected = true;
+}
+
+void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join){
+ if (!dtxSelected) {
+ throw ConnectionException(503, "Channel has not been selected for use with dtx");
+ }
dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid));
txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
- mgr.start(xid, dtxBuffer);
+ if (join) {
+ mgr.join(xid, dtxBuffer);
+ } else {
+ mgr.start(xid, dtxBuffer);
+ }
}
-void Channel::endDtx(const std::string& xid){
+void Channel::endDtx(const std::string& xid, bool fail){
+ if (!dtxBuffer) {
+ throw ConnectionException(503, boost::format("xid %1% not associated with this channel") % xid);
+ }
if (dtxBuffer->getXid() != xid) {
throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end")
% dtxBuffer->getXid() % xid);
}
- TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
- accumulatedAck.clear();
- dtxBuffer->enlist(txAck);
- dtxBuffer->markEnded();
+ if (fail) {
+ accumulatedAck.clear();
+ dtxBuffer->fail();
+ } else {
+ TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+ accumulatedAck.clear();
+ dtxBuffer->enlist(txAck);
+ dtxBuffer->markEnded();
+ }
dtxBuffer.reset();
txBuffer.reset();
@@ -250,7 +273,7 @@ void Channel::complete(Message::shared_ptr msg) {
Exchange::shared_ptr exchange =
connection.broker.getExchanges().get(msg->getExchange());
assert(exchange.get());
- if (txBuffer) {
+ if (txBuffer.get()) {
TxPublish* deliverable(new TxPublish(msg));
TxOp::shared_ptr op(deliverable);
exchange->route(*deliverable, msg->getRoutingKey(),
@@ -276,7 +299,7 @@ void Channel::ack(uint64_t deliveryTag, bool multiple){
}
void Channel::ack(uint64_t firstTag, uint64_t lastTag){
- if (txBuffer) {
+ if (txBuffer.get()) {
accumulatedAck.update(firstTag, lastTag);
//TODO: I think the outstanding prefetch size & count should be updated at this point...