diff options
| author | Gordon Sim <gsim@apache.org> | 2007-04-19 17:56:21 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-04-19 17:56:21 +0000 |
| commit | b1ad015fe2670bc3e5471c5e350e243cca948dcf (patch) | |
| tree | cbbae911b59a34f7cbe998609ca9d14f8984ca37 /cpp/src/qpid/broker/BrokerChannel.cpp | |
| parent | e7cc3594288f5a6ed6c6565e34413823f5b8e2d8 (diff) | |
| download | qpid-python-b1ad015fe2670bc3e5471c5e350e243cca948dcf.tar.gz | |
Some dtx related updates.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@530500 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 66 |
1 files changed, 32 insertions, 34 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index afbbed5c29..e4ff098c8e 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -54,7 +54,6 @@ Channel::Channel( ChannelAdapter(id, &con.getOutput(), con.getVersion()), connection(con), currentDeliveryTag(1), - transactional(false), prefetchSize(0), prefetchCount(0), framesize(_framesize), @@ -104,24 +103,38 @@ void Channel::close(){ recover(true); } -void Channel::begin(){ - transactional = true; +void Channel::startTx(){ + txBuffer = TxBuffer::shared_ptr(new TxBuffer()); } void Channel::commit(){ TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); - txBuffer.enlist(txAck); - if(txBuffer.prepare(store)){ - txBuffer.commit(); + txBuffer->enlist(txAck); + if (txBuffer->commitLocal(store)) { + accumulatedAck.clear(); } - accumulatedAck.clear(); } void Channel::rollback(){ - txBuffer.rollback(); + txBuffer->rollback(); accumulatedAck.clear(); } +void Channel::startDtx(const std::string& xid, DtxManager& mgr){ + dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer()); + txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer); + mgr.start(xid, dtxBuffer); +} + +void Channel::endDtx(){ + TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); + dtxBuffer->enlist(txAck); + dtxBuffer->markEnded(); + + dtxBuffer.reset(); + txBuffer.reset(); +} + void Channel::deliver( Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected) @@ -180,23 +193,8 @@ void Channel::ConsumerImpl::requestDispatch(){ queue->dispatch(); } -void Channel::handleInlineTransfer(Message::shared_ptr msg) -{ - Exchange::shared_ptr exchange = - connection.broker.getExchanges().get(msg->getExchange()); - if(transactional){ - TxPublish* deliverable(new TxPublish(msg)); - TxOp::shared_ptr op(deliverable); - exchange->route( - *deliverable, msg->getRoutingKey(), - &(msg->getApplicationHeaders())); - txBuffer.enlist(op); - }else{ - DeliverableMessage deliverable(msg); - exchange->route( - deliverable, msg->getRoutingKey(), - &(msg->getApplicationHeaders())); - } +void Channel::handleInlineTransfer(Message::shared_ptr msg){ + complete(msg); } void Channel::handlePublish(Message* _message){ @@ -222,12 +220,12 @@ void Channel::complete(Message::shared_ptr msg) { Exchange::shared_ptr exchange = connection.broker.getExchanges().get(msg->getExchange()); assert(exchange.get()); - if(transactional) { + if (txBuffer) { TxPublish* deliverable(new TxPublish(msg)); TxOp::shared_ptr op(deliverable); exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); - txBuffer.enlist(op); + txBuffer->enlist(op); } else { DeliverableMessage deliverable(msg); exchange->route(deliverable, msg->getRoutingKey(), @@ -236,24 +234,24 @@ void Channel::complete(Message::shared_ptr msg) { } void Channel::ack(){ - ack(getFirstAckRequest(), getLastAckRequest()); + ack(getFirstAckRequest(), getLastAckRequest()); } // Used by Basic void Channel::ack(uint64_t deliveryTag, bool multiple){ - if (multiple) - ack(0, deliveryTag); - else - ack(deliveryTag, deliveryTag); + if (multiple) + ack(0, deliveryTag); + else + ack(deliveryTag, deliveryTag); } void Channel::ack(uint64_t firstTag, uint64_t lastTag){ - if(transactional){ + if (txBuffer) { accumulatedAck.update(firstTag, lastTag); //TODO: I think the outstanding prefetch size & count should be updated at this point... //TODO: ...this may then necessitate dispatching to consumers - }else{ + } else { Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag)); |
