summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerChannel.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-04-19 17:56:21 +0000
committerGordon Sim <gsim@apache.org>2007-04-19 17:56:21 +0000
commitb1ad015fe2670bc3e5471c5e350e243cca948dcf (patch)
treecbbae911b59a34f7cbe998609ca9d14f8984ca37 /cpp/src/qpid/broker/BrokerChannel.cpp
parente7cc3594288f5a6ed6c6565e34413823f5b8e2d8 (diff)
downloadqpid-python-b1ad015fe2670bc3e5471c5e350e243cca948dcf.tar.gz
Some dtx related updates.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@530500 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp66
1 files changed, 32 insertions, 34 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index afbbed5c29..e4ff098c8e 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -54,7 +54,6 @@ Channel::Channel(
ChannelAdapter(id, &con.getOutput(), con.getVersion()),
connection(con),
currentDeliveryTag(1),
- transactional(false),
prefetchSize(0),
prefetchCount(0),
framesize(_framesize),
@@ -104,24 +103,38 @@ void Channel::close(){
recover(true);
}
-void Channel::begin(){
- transactional = true;
+void Channel::startTx(){
+ txBuffer = TxBuffer::shared_ptr(new TxBuffer());
}
void Channel::commit(){
TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
- txBuffer.enlist(txAck);
- if(txBuffer.prepare(store)){
- txBuffer.commit();
+ txBuffer->enlist(txAck);
+ if (txBuffer->commitLocal(store)) {
+ accumulatedAck.clear();
}
- accumulatedAck.clear();
}
void Channel::rollback(){
- txBuffer.rollback();
+ txBuffer->rollback();
accumulatedAck.clear();
}
+void Channel::startDtx(const std::string& xid, DtxManager& mgr){
+ dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer());
+ txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
+ mgr.start(xid, dtxBuffer);
+}
+
+void Channel::endDtx(){
+ TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
+ dtxBuffer->enlist(txAck);
+ dtxBuffer->markEnded();
+
+ dtxBuffer.reset();
+ txBuffer.reset();
+}
+
void Channel::deliver(
Message::shared_ptr& msg, const string& consumerTag,
Queue::shared_ptr& queue, bool ackExpected)
@@ -180,23 +193,8 @@ void Channel::ConsumerImpl::requestDispatch(){
queue->dispatch();
}
-void Channel::handleInlineTransfer(Message::shared_ptr msg)
-{
- Exchange::shared_ptr exchange =
- connection.broker.getExchanges().get(msg->getExchange());
- if(transactional){
- TxPublish* deliverable(new TxPublish(msg));
- TxOp::shared_ptr op(deliverable);
- exchange->route(
- *deliverable, msg->getRoutingKey(),
- &(msg->getApplicationHeaders()));
- txBuffer.enlist(op);
- }else{
- DeliverableMessage deliverable(msg);
- exchange->route(
- deliverable, msg->getRoutingKey(),
- &(msg->getApplicationHeaders()));
- }
+void Channel::handleInlineTransfer(Message::shared_ptr msg){
+ complete(msg);
}
void Channel::handlePublish(Message* _message){
@@ -222,12 +220,12 @@ void Channel::complete(Message::shared_ptr msg) {
Exchange::shared_ptr exchange =
connection.broker.getExchanges().get(msg->getExchange());
assert(exchange.get());
- if(transactional) {
+ if (txBuffer) {
TxPublish* deliverable(new TxPublish(msg));
TxOp::shared_ptr op(deliverable);
exchange->route(*deliverable, msg->getRoutingKey(),
&(msg->getApplicationHeaders()));
- txBuffer.enlist(op);
+ txBuffer->enlist(op);
} else {
DeliverableMessage deliverable(msg);
exchange->route(deliverable, msg->getRoutingKey(),
@@ -236,24 +234,24 @@ void Channel::complete(Message::shared_ptr msg) {
}
void Channel::ack(){
- ack(getFirstAckRequest(), getLastAckRequest());
+ ack(getFirstAckRequest(), getLastAckRequest());
}
// Used by Basic
void Channel::ack(uint64_t deliveryTag, bool multiple){
- if (multiple)
- ack(0, deliveryTag);
- else
- ack(deliveryTag, deliveryTag);
+ if (multiple)
+ ack(0, deliveryTag);
+ else
+ ack(deliveryTag, deliveryTag);
}
void Channel::ack(uint64_t firstTag, uint64_t lastTag){
- if(transactional){
+ if (txBuffer) {
accumulatedAck.update(firstTag, lastTag);
//TODO: I think the outstanding prefetch size & count should be updated at this point...
//TODO: ...this may then necessitate dispatching to consumers
- }else{
+ } else {
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag));