diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 40 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 16 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 1 |
4 files changed, 18 insertions, 46 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 5c74b11a22..e19a61f992 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -32,36 +32,24 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, const std::string _tag, DeliveryToken::shared_ptr _token, const DeliveryId _id, - bool _acquired, bool accepted) : msg(_msg), - queue(_queue), - tag(_tag), - token(_token), - id(_id), - acquired(_acquired), - pull(false), - cancelled(false), - credit(msg.payload ? msg.payload->getRequiredCredit() : 0), - size(msg.payload ? msg.payload->contentSize() : 0), - completed(false), - ended(accepted) + bool _acquired, bool accepted, + bool _windowing) : msg(_msg), + queue(_queue), + tag(_tag), + token(_token), + id(_id), + acquired(_acquired), + pull(false), + cancelled(false), + credit(msg.payload ? msg.payload->getRequiredCredit() : 0), + size(msg.payload ? msg.payload->contentSize() : 0), + completed(false), + ended(accepted), + windowing(_windowing) { if (accepted) setEnded(); } -DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, - Queue::shared_ptr _queue, - const DeliveryId _id) : msg(_msg), - queue(_queue), - id(_id), - acquired(true), - pull(true), - cancelled(false), - credit(msg.payload ? msg.payload->getRequiredCredit() : 0), - size(msg.payload ? msg.payload->contentSize() : 0), - completed(false), - ended(false) -{} - void DeliveryRecord::setEnded() { ended = true; diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index b5149c5a00..8fdfaa71e6 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -53,12 +53,11 @@ class DeliveryRecord{ bool completed; bool ended; + const bool windowing; public: DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token, - const DeliveryId id, bool acquired, bool confirmed = false); - DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id); - + const DeliveryId id, bool acquired, bool confirmed, bool windowing); bool matches(DeliveryId tag) const; bool matchOrAfter(DeliveryId tag) const; bool after(DeliveryId tag) const; @@ -77,7 +76,7 @@ class DeliveryRecord{ bool isAcquired() const { return acquired; } bool isComplete() const { return completed; } - bool isRedundant() const { return ended && completed; } + bool isRedundant() const { return ended && (!windowing || completed); } uint32_t getCredit() const; const std::string& getTag() const { return tag; } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 6fb38eb674..26aea36b8a 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -265,7 +265,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) DeliveryId deliveryTag = parent->deliveryAdapter.deliver(msg, token); if (windowing || ackExpected || !acquire) { - parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected)); + parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected, windowing)); } if (acquire && !ackExpected) { queue->dequeue(0, msg); @@ -444,20 +444,6 @@ void SemanticState::recover(bool requeue) } } -bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected) -{ - QueuedMessage msg = queue->get(); - if(msg.payload){ - DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg, token); - if(ackExpected){ - unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); - } - return true; - }else{ - return false; - } -} - DeliveryId SemanticState::redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) { return deliveryAdapter.deliver(msg, token); diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 8207212fd9..4d23220692 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -179,7 +179,6 @@ class SemanticState : public sys::OutputTask, void flush(const std::string& destination); void stop(const std::string& destination); - bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected); void startTx(); void commit(MessageStore* const store); void rollback(); |
