diff options
Diffstat (limited to 'cpp/src/qpid/broker/Channel.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Channel.cpp | 175 |
1 files changed, 85 insertions, 90 deletions
diff --git a/cpp/src/qpid/broker/Channel.cpp b/cpp/src/qpid/broker/Channel.cpp index 5497eda842..c40811e921 100644 --- a/cpp/src/qpid/broker/Channel.cpp +++ b/cpp/src/qpid/broker/Channel.cpp @@ -21,6 +21,8 @@ #include <sstream> #include <assert.h> +using std::mem_fun_ref; +using std::bind2nd; using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::concurrent; @@ -29,14 +31,17 @@ using namespace qpid::concurrent; Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : id(_id), out(_out), - deliveryTag(1), + currentDeliveryTag(1), transactional(false), prefetchSize(0), prefetchCount(0), - outstandingSize(0), - outstandingCount(0), framesize(_framesize), - tagGenerator("sgen"){} + tagGenerator("sgen"), + store(0), + messageBuilder(this){ + + outstanding.reset(); +} Channel::~Channel(){ } @@ -86,30 +91,36 @@ void Channel::begin(){ } void Channel::commit(){ - + TxAck txAck(accumulatedAck, unacked); + txBuffer.enlist(&txAck); + if(txBuffer.prepare(store)){ + txBuffer.commit(); + } + accumulatedAck.clear(); } void Channel::rollback(){ - + txBuffer.rollback(); + accumulatedAck.clear(); } void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){ Locker locker(deliveryLock); - u_int64_t myDeliveryTag = deliveryTag++; + u_int64_t deliveryTag = currentDeliveryTag++; if(ackExpected){ - unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag)); - outstandingSize += msg->contentSize(); - outstandingCount++; + unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag)); + outstanding.size += msg->contentSize(); + outstanding.count++; } //send deliver method, header and content(s) - msg->deliver(out, id, consumerTag, myDeliveryTag, framesize); + msg->deliver(out, id, consumerTag, deliveryTag, framesize); } bool Channel::checkPrefetch(Message::shared_ptr& msg){ Locker locker(deliveryLock); - bool countOk = !prefetchCount || prefetchCount > unacknowledged.size(); - bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstandingSize || unacknowledged.empty(); + bool countOk = !prefetchCount || prefetchCount > unacked.size(); + bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); return countOk && sizeOk; } @@ -144,43 +155,66 @@ void Channel::ConsumerImpl::requestDispatch(){ if(blocked) queue->dispatch(); } -void Channel::checkMessage(const std::string& text){ - if(!message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, text); - } +void Channel::handlePublish(Message* _message, Exchange* _exchange){ + Message::shared_ptr message(_message); + exchange = _exchange; + messageBuilder.initialise(message); +} + +void Channel::handleHeader(AMQHeaderBody::shared_ptr header){ + messageBuilder.setHeader(header); +} + +void Channel::handleContent(AMQContentBody::shared_ptr content){ + messageBuilder.addContent(content); } -void Channel::handlePublish(Message* msg){ - if(message.get()){ - THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed."); +void Channel::complete(Message::shared_ptr& msg){ + if(exchange){ + if(transactional){ + TxPublish* deliverable = new TxPublish(msg); + exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); + txBuffer.enlist(new DeletingTxOp(deliverable)); + }else{ + DeliverableMessage deliverable(msg); + exchange->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); + } + exchange = 0; + }else{ + std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl; } - message = Message::shared_ptr(msg); } -void Channel::ack(u_int64_t _deliveryTag, bool multiple){ - Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery - - ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(_deliveryTag)); - if(i == unacknowledged.end()){ - throw InvalidAckException(); - }else if(multiple){ - unacknowledged.erase(unacknowledged.begin(), ++i); - //recompute prefetch outstanding (note: messages delivered through get are ignored) - CalculatePrefetch calc(for_each(unacknowledged.begin(), unacknowledged.end(), CalculatePrefetch())); - outstandingSize = calc.getSize(); - outstandingCount = calc.getCount(); +void Channel::ack(u_int64_t deliveryTag, bool multiple){ + if(transactional){ + accumulatedAck.update(deliveryTag, multiple); + //TODO: I think the outstanding prefetch size & count should be updated at this point... + //TODO: ...this may then necessitate dispatching to consumers }else{ - if(!i->pull){ - outstandingSize -= i->msg->contentSize(); - outstandingCount--; + Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery + + ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag)); + if(i == unacked.end()){ + throw InvalidAckException(); + }else if(multiple){ + ack_iterator end = ++i; + for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard)); + unacked.erase(unacked.begin(), end); + + //recalculate the prefetch: + outstanding.reset(); + for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding)); + }else{ + i->discard(); + i->subtractFrom(&outstanding); + unacked.erase(i); } - unacknowledged.erase(i); - } - //if the prefetch limit had previously been reached, there may - //be messages that can be now be delivered - for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ - j->second->requestDispatch(); + //if the prefetch limit had previously been reached, there may + //be messages that can be now be delivered + for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ + j->second->requestDispatch(); + } } } @@ -188,14 +222,12 @@ void Channel::recover(bool requeue){ Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery if(requeue){ - outstandingSize = 0; - outstandingCount = 0; - ack_iterator start(unacknowledged.begin()); - ack_iterator end(unacknowledged.end()); - for_each(start, end, Requeue()); - unacknowledged.erase(start, end); + outstanding.reset(); + std::list<DeliveryRecord> copy = unacked; + unacked.clear(); + for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue)); }else{ - for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this)); + for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this)); } } @@ -203,10 +235,10 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ Message::shared_ptr msg = queue->dequeue(); if(msg){ Locker locker(deliveryLock); - u_int64_t myDeliveryTag = deliveryTag++; + u_int64_t myDeliveryTag = currentDeliveryTag++; msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize); if(ackExpected){ - unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag)); + unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } return true; }else{ @@ -214,43 +246,6 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ } } -Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {} - -bool Channel::MatchAck::operator()(AckRecord& record) const{ - return tag == record.deliveryTag; -} - -void Channel::Requeue::operator()(AckRecord& record) const{ - record.msg->redeliver(); - record.queue->deliver(record.msg); -} - -Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {} - -void Channel::Redeliver::operator()(AckRecord& record) const{ - if(record.pull){ - //if message was originally sent as response to get, we must requeue it - record.msg->redeliver(); - record.queue->deliver(record.msg); - }else{ - record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize); - } -} - -Channel::CalculatePrefetch::CalculatePrefetch() : size(0){} - -void Channel::CalculatePrefetch::operator()(AckRecord& record){ - if(!record.pull){ - //ignore messages that were sent in response to get when calculating prefetch - size += record.msg->contentSize(); - count++; - } -} - -u_int32_t Channel::CalculatePrefetch::getSize(){ - return size; -} - -u_int16_t Channel::CalculatePrefetch::getCount(){ - return count; +void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){ + msg->deliver(out, id, consumerTag, deliveryTag, framesize); } |
