diff options
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 41 |
1 files changed, 32 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index 096478faad..0c06350c02 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -61,6 +61,7 @@ Channel::Channel( prefetchCount(0), framesize(_framesize), tagGenerator("sgen"), + dtxSelected(false), accumulatedAck(0), store(_store), messageBuilder(this, _store, _stagingThreshold), @@ -103,6 +104,9 @@ void Channel::cancel(const string& tag){ void Channel::close(){ opened = false; consumers.clear(); + if (dtxBuffer.get()) { + dtxBuffer->fail(); + } recover(true); } @@ -123,22 +127,41 @@ void Channel::rollback(){ accumulatedAck.clear(); } -void Channel::startDtx(const std::string& xid, DtxManager& mgr){ +void Channel::selectDtx(){ + dtxSelected = true; +} + +void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join){ + if (!dtxSelected) { + throw ConnectionException(503, "Channel has not been selected for use with dtx"); + } dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer(xid)); txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer); - mgr.start(xid, dtxBuffer); + if (join) { + mgr.join(xid, dtxBuffer); + } else { + mgr.start(xid, dtxBuffer); + } } -void Channel::endDtx(const std::string& xid){ +void Channel::endDtx(const std::string& xid, bool fail){ + if (!dtxBuffer) { + throw ConnectionException(503, boost::format("xid %1% not associated with this channel") % xid); + } if (dtxBuffer->getXid() != xid) { throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on end") % dtxBuffer->getXid() % xid); } - TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); - accumulatedAck.clear(); - dtxBuffer->enlist(txAck); - dtxBuffer->markEnded(); + if (fail) { + accumulatedAck.clear(); + dtxBuffer->fail(); + } else { + TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + accumulatedAck.clear(); + dtxBuffer->enlist(txAck); + dtxBuffer->markEnded(); + } dtxBuffer.reset(); txBuffer.reset(); @@ -250,7 +273,7 @@ void Channel::complete(Message::shared_ptr msg) { Exchange::shared_ptr exchange = connection.broker.getExchanges().get(msg->getExchange()); assert(exchange.get()); - if (txBuffer) { + if (txBuffer.get()) { TxPublish* deliverable(new TxPublish(msg)); TxOp::shared_ptr op(deliverable); exchange->route(*deliverable, msg->getRoutingKey(), @@ -276,7 +299,7 @@ void Channel::ack(uint64_t deliveryTag, bool multiple){ } void Channel::ack(uint64_t firstTag, uint64_t lastTag){ - if (txBuffer) { + if (txBuffer.get()) { accumulatedAck.update(firstTag, lastTag); //TODO: I think the outstanding prefetch size & count should be updated at this point... |
