summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Channel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Channel.cpp')
-rw-r--r--cpp/src/qpid/broker/Channel.cpp175
1 files changed, 85 insertions, 90 deletions
diff --git a/cpp/src/qpid/broker/Channel.cpp b/cpp/src/qpid/broker/Channel.cpp
index 5497eda842..c40811e921 100644
--- a/cpp/src/qpid/broker/Channel.cpp
+++ b/cpp/src/qpid/broker/Channel.cpp
@@ -21,6 +21,8 @@
#include <sstream>
#include <assert.h>
+using std::mem_fun_ref;
+using std::bind2nd;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::concurrent;
@@ -29,14 +31,17 @@ using namespace qpid::concurrent;
Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) :
id(_id),
out(_out),
- deliveryTag(1),
+ currentDeliveryTag(1),
transactional(false),
prefetchSize(0),
prefetchCount(0),
- outstandingSize(0),
- outstandingCount(0),
framesize(_framesize),
- tagGenerator("sgen"){}
+ tagGenerator("sgen"),
+ store(0),
+ messageBuilder(this){
+
+ outstanding.reset();
+}
Channel::~Channel(){
}
@@ -86,30 +91,36 @@ void Channel::begin(){
}
void Channel::commit(){
-
+ TxAck txAck(accumulatedAck, unacked);
+ txBuffer.enlist(&txAck);
+ if(txBuffer.prepare(store)){
+ txBuffer.commit();
+ }
+ accumulatedAck.clear();
}
void Channel::rollback(){
-
+ txBuffer.rollback();
+ accumulatedAck.clear();
}
void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
Locker locker(deliveryLock);
- u_int64_t myDeliveryTag = deliveryTag++;
+ u_int64_t deliveryTag = currentDeliveryTag++;
if(ackExpected){
- unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag));
- outstandingSize += msg->contentSize();
- outstandingCount++;
+ unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag));
+ outstanding.size += msg->contentSize();
+ outstanding.count++;
}
//send deliver method, header and content(s)
- msg->deliver(out, id, consumerTag, myDeliveryTag, framesize);
+ msg->deliver(out, id, consumerTag, deliveryTag, framesize);
}
bool Channel::checkPrefetch(Message::shared_ptr& msg){
Locker locker(deliveryLock);
- bool countOk = !prefetchCount || prefetchCount > unacknowledged.size();
- bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstandingSize || unacknowledged.empty();
+ bool countOk = !prefetchCount || prefetchCount > unacked.size();
+ bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
return countOk && sizeOk;
}
@@ -144,43 +155,66 @@ void Channel::ConsumerImpl::requestDispatch(){
if(blocked) queue->dispatch();
}
-void Channel::checkMessage(const std::string& text){
- if(!message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, text);
- }
+void Channel::handlePublish(Message* _message, Exchange* _exchange){
+ Message::shared_ptr message(_message);
+ exchange = _exchange;
+ messageBuilder.initialise(message);
+}
+
+void Channel::handleHeader(AMQHeaderBody::shared_ptr header){
+ messageBuilder.setHeader(header);
+}
+
+void Channel::handleContent(AMQContentBody::shared_ptr content){
+ messageBuilder.addContent(content);
}
-void Channel::handlePublish(Message* msg){
- if(message.get()){
- THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got publish before previous content was completed.");
+void Channel::complete(Message::shared_ptr& msg){
+ if(exchange){
+ if(transactional){
+ TxPublish* deliverable = new TxPublish(msg);
+ exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
+ txBuffer.enlist(new DeletingTxOp(deliverable));
+ }else{
+ DeliverableMessage deliverable(msg);
+ exchange->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders()));
+ }
+ exchange = 0;
+ }else{
+ std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl;
}
- message = Message::shared_ptr(msg);
}
-void Channel::ack(u_int64_t _deliveryTag, bool multiple){
- Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
-
- ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(_deliveryTag));
- if(i == unacknowledged.end()){
- throw InvalidAckException();
- }else if(multiple){
- unacknowledged.erase(unacknowledged.begin(), ++i);
- //recompute prefetch outstanding (note: messages delivered through get are ignored)
- CalculatePrefetch calc(for_each(unacknowledged.begin(), unacknowledged.end(), CalculatePrefetch()));
- outstandingSize = calc.getSize();
- outstandingCount = calc.getCount();
+void Channel::ack(u_int64_t deliveryTag, bool multiple){
+ if(transactional){
+ accumulatedAck.update(deliveryTag, multiple);
+ //TODO: I think the outstanding prefetch size & count should be updated at this point...
+ //TODO: ...this may then necessitate dispatching to consumers
}else{
- if(!i->pull){
- outstandingSize -= i->msg->contentSize();
- outstandingCount--;
+ Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag));
+ if(i == unacked.end()){
+ throw InvalidAckException();
+ }else if(multiple){
+ ack_iterator end = ++i;
+ for_each(unacked.begin(), end, mem_fun_ref(&DeliveryRecord::discard));
+ unacked.erase(unacked.begin(), end);
+
+ //recalculate the prefetch:
+ outstanding.reset();
+ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding));
+ }else{
+ i->discard();
+ i->subtractFrom(&outstanding);
+ unacked.erase(i);
}
- unacknowledged.erase(i);
- }
- //if the prefetch limit had previously been reached, there may
- //be messages that can be now be delivered
- for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
- j->second->requestDispatch();
+ //if the prefetch limit had previously been reached, there may
+ //be messages that can be now be delivered
+ for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
+ j->second->requestDispatch();
+ }
}
}
@@ -188,14 +222,12 @@ void Channel::recover(bool requeue){
Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
if(requeue){
- outstandingSize = 0;
- outstandingCount = 0;
- ack_iterator start(unacknowledged.begin());
- ack_iterator end(unacknowledged.end());
- for_each(start, end, Requeue());
- unacknowledged.erase(start, end);
+ outstanding.reset();
+ std::list<DeliveryRecord> copy = unacked;
+ unacked.clear();
+ for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue));
}else{
- for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this));
+ for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));
}
}
@@ -203,10 +235,10 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
Message::shared_ptr msg = queue->dequeue();
if(msg){
Locker locker(deliveryLock);
- u_int64_t myDeliveryTag = deliveryTag++;
+ u_int64_t myDeliveryTag = currentDeliveryTag++;
msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize);
if(ackExpected){
- unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag));
+ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
return true;
}else{
@@ -214,43 +246,6 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
}
}
-Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {}
-
-bool Channel::MatchAck::operator()(AckRecord& record) const{
- return tag == record.deliveryTag;
-}
-
-void Channel::Requeue::operator()(AckRecord& record) const{
- record.msg->redeliver();
- record.queue->deliver(record.msg);
-}
-
-Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {}
-
-void Channel::Redeliver::operator()(AckRecord& record) const{
- if(record.pull){
- //if message was originally sent as response to get, we must requeue it
- record.msg->redeliver();
- record.queue->deliver(record.msg);
- }else{
- record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize);
- }
-}
-
-Channel::CalculatePrefetch::CalculatePrefetch() : size(0){}
-
-void Channel::CalculatePrefetch::operator()(AckRecord& record){
- if(!record.pull){
- //ignore messages that were sent in response to get when calculating prefetch
- size += record.msg->contentSize();
- count++;
- }
-}
-
-u_int32_t Channel::CalculatePrefetch::getSize(){
- return size;
-}
-
-u_int16_t Channel::CalculatePrefetch::getCount(){
- return count;
+void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){
+ msg->deliver(out, id, consumerTag, deliveryTag, framesize);
}