diff options
| author | Gordon Sim <gsim@apache.org> | 2007-07-27 15:44:52 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-07-27 15:44:52 +0000 |
| commit | 80406d0fb680239a0141b81fb0b9f20d20c9b1e1 (patch) | |
| tree | 13677bf773bf25db03144aa72c97a49d2810240d /cpp/src/qpid/broker/BrokerChannel.cpp | |
| parent | a9232d5a02a19f093f212cb0b76772a20b45cb1b (diff) | |
| download | qpid-python-80406d0fb680239a0141b81fb0b9f20d20c9b1e1.tar.gz | |
Use execution layer to acknowledge messages.
Turn off 0-9 framing of requests and responses.
Some refactoring around message delivery.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560285 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 86 |
1 files changed, 55 insertions, 31 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index a598717c5d..c50fbd5559 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -49,9 +49,10 @@ using namespace qpid::framing; using namespace qpid::sys; -Channel::Channel(Connection& con, ChannelId _id, MessageStore* const _store) : +Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageStore* const _store) : id(_id), connection(con), + out(_out), currentDeliveryTag(1), prefetchSize(0), prefetchCount(0), @@ -76,7 +77,7 @@ bool Channel::exists(const string& consumerTag){ // TODO aconway 2007-02-12: Why is connection token passed in instead // of using the channel's parent connection? -void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, +void Channel::consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) @@ -84,7 +85,7 @@ void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, if(tagInOut.empty()) tagInOut = tagGenerator.generate(); std::auto_ptr<ConsumerImpl> c( - new ConsumerImpl(this, adapter, tagInOut, queue, connection, acks)); + new ConsumerImpl(this, token, tagInOut, queue, connection, acks)); queue->consume(c.get(), exclusive);//may throw exception consumers.insert(tagInOut, c.release()); } @@ -97,7 +98,8 @@ void Channel::cancel(const string& tag){ consumers.erase(i); } -void Channel::close(){ +void Channel::close() +{ opened = false; consumers.clear(); if (dtxBuffer.get()) { @@ -106,11 +108,15 @@ void Channel::close(){ recover(true); } -void Channel::startTx(){ +void Channel::startTx() +{ txBuffer = TxBuffer::shared_ptr(new TxBuffer()); } -void Channel::commit(){ +void Channel::commit() +{ + if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions"); + TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked)); txBuffer->enlist(txAck); if (txBuffer->commitLocal(store)) { @@ -118,16 +124,21 @@ void Channel::commit(){ } } -void Channel::rollback(){ +void Channel::rollback() +{ + if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions"); + txBuffer->rollback(); accumulatedAck.clear(); } -void Channel::selectDtx(){ +void Channel::selectDtx() +{ dtxSelected = true; } -void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join){ +void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join) +{ if (!dtxSelected) { throw ConnectionException(503, "Channel has not been selected for use with dtx"); } @@ -140,7 +151,8 @@ void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join){ } } -void Channel::endDtx(const std::string& xid, bool fail){ +void Channel::endDtx(const std::string& xid, bool fail) +{ if (!dtxBuffer) { throw ConnectionException(503, boost::format("xid %1% not associated with this channel") % xid); } @@ -160,7 +172,8 @@ void Channel::endDtx(const std::string& xid, bool fail){ dtxBuffer.reset(); } -void Channel::suspendDtx(const std::string& xid){ +void Channel::suspendDtx(const std::string& xid) +{ if (dtxBuffer->getXid() != xid) { throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") % dtxBuffer->getXid() % xid); @@ -171,7 +184,8 @@ void Channel::suspendDtx(const std::string& xid){ dtxBuffer->setSuspended(true); } -void Channel::resumeDtx(const std::string& xid){ +void Channel::resumeDtx(const std::string& xid) +{ if (dtxBuffer->getXid() != xid) { throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume") % dtxBuffer->getXid() % xid); @@ -199,20 +213,22 @@ void Channel::record(const DeliveryRecord& delivery) delivery.addTo(&outstanding); } -bool Channel::checkPrefetch(Message::shared_ptr& msg){ +bool Channel::checkPrefetch(Message::shared_ptr& msg) +{ Mutex::ScopedLock locker(deliveryLock); bool countOk = !prefetchCount || prefetchCount > unacked.size(); bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); return countOk && sizeOk; } -Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, std::auto_ptr<DeliveryAdapter> _adapter, +Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, DeliveryToken::shared_ptr _token, const string& _tag, Queue::shared_ptr _queue, ConnectionToken* const _connection, bool ack - ) : parent(_parent), adapter(_adapter), tag(_tag), queue(_queue), connection(_connection), + ) : parent(_parent), token(_token), tag(_tag), queue(_queue), connection(_connection), ackExpected(ack), blocked(false) {} -bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ +bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg) +{ if(!connection || connection != msg->getPublisher()){//check for no_local if(!parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))){ blocked = true; @@ -220,11 +236,10 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ blocked = false; Mutex::ScopedLock locker(parent->deliveryLock); - uint64_t deliveryTag = adapter->getNextDeliveryTag(); + uint64_t deliveryTag = parent->out.deliver(msg, token); if(ackExpected){ parent->record(DeliveryRecord(msg, queue, tag, deliveryTag)); } - adapter->deliver(msg, deliveryTag); return true; } @@ -234,14 +249,15 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, uint64_t deliveryTag) { Mutex::ScopedLock locker(parent->deliveryLock); - adapter->deliver(msg, deliveryTag); + parent->out.redeliver(msg, token, deliveryTag); } Channel::ConsumerImpl::~ConsumerImpl() { cancel(); } -void Channel::ConsumerImpl::cancel(){ +void Channel::ConsumerImpl::cancel() +{ if(queue) { queue->cancel(this); if (queue->canAutoDelete()) { @@ -251,27 +267,32 @@ void Channel::ConsumerImpl::cancel(){ } } -void Channel::ConsumerImpl::requestDispatch(){ +void Channel::ConsumerImpl::requestDispatch() +{ if(blocked) queue->requestDispatch(); } -void Channel::handleInlineTransfer(Message::shared_ptr msg){ +void Channel::handleInlineTransfer(Message::shared_ptr msg) +{ complete(msg); } -void Channel::handlePublish(Message* _message){ +void Channel::handlePublish(Message* _message) +{ Message::shared_ptr message(_message); messageBuilder.initialise(message); } -void Channel::handleHeader(AMQHeaderBody::shared_ptr header){ +void Channel::handleHeader(AMQHeaderBody::shared_ptr header) +{ messageBuilder.setHeader(header); //at this point, decide based on the size of the message whether we want //to stage it by saving content directly to disk as it arrives } -void Channel::handleContent(AMQContentBody::shared_ptr content){ +void Channel::handleContent(AMQContentBody::shared_ptr content) +{ messageBuilder.addContent(content); } @@ -306,14 +327,16 @@ void Channel::route(Message::shared_ptr msg, Deliverable& strategy) { } // Used by Basic -void Channel::ack(uint64_t deliveryTag, bool multiple){ +void Channel::ack(uint64_t deliveryTag, bool multiple) +{ if (multiple) ack(0, deliveryTag); else ack(deliveryTag, deliveryTag); } -void Channel::ack(uint64_t firstTag, uint64_t lastTag){ +void Channel::ack(uint64_t firstTag, uint64_t lastTag) +{ if (txBuffer.get()) { accumulatedAck.update(firstTag, lastTag); //TODO: I think the outstanding prefetch size & count should be updated at this point... @@ -355,7 +378,8 @@ void Channel::ack(uint64_t firstTag, uint64_t lastTag){ } } -void Channel::recover(bool requeue){ +void Channel::recover(bool requeue) +{ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery if(requeue){ @@ -368,12 +392,12 @@ void Channel::recover(bool requeue){ } } -bool Channel::get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected){ +bool Channel::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected) +{ Message::shared_ptr msg = queue->dequeue(); if(msg){ Mutex::ScopedLock locker(deliveryLock); - uint64_t myDeliveryTag = adapter.getNextDeliveryTag(); - adapter.deliver(msg, myDeliveryTag); + uint64_t myDeliveryTag = out.deliver(msg, token); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } |
