diff options
| author | Gordon Sim <gsim@apache.org> | 2008-10-15 10:03:49 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-10-15 10:03:49 +0000 |
| commit | 0fcf0505f0d0f3c1e7beb8463b9c58039c2e6d5e (patch) | |
| tree | e4d0001a9fbb35a2a33f46cf108d78fd32eb75c7 | |
| parent | 56fb1f26a04a9ba2b6af40c6933f8dcc79143772 (diff) | |
| download | qpid-python-0fcf0505f0d0f3c1e7beb8463b9c58039c2e6d5e.tar.gz | |
c++ broker: Don't hold on to delivery records for accepted/released messages unless required due to being in windowing mode.
python client: Modified start() on incoming queue to setthe flow mode as credit (not windowing)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@704838 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 40 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 16 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 1 | ||||
| -rw-r--r-- | cpp/src/tests/DeliveryRecordTest.cpp | 2 | ||||
| -rw-r--r-- | python/qpid/session.py | 1 |
6 files changed, 20 insertions, 47 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 5c74b11a22..e19a61f992 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -32,36 +32,24 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, const std::string _tag, DeliveryToken::shared_ptr _token, const DeliveryId _id, - bool _acquired, bool accepted) : msg(_msg), - queue(_queue), - tag(_tag), - token(_token), - id(_id), - acquired(_acquired), - pull(false), - cancelled(false), - credit(msg.payload ? msg.payload->getRequiredCredit() : 0), - size(msg.payload ? msg.payload->contentSize() : 0), - completed(false), - ended(accepted) + bool _acquired, bool accepted, + bool _windowing) : msg(_msg), + queue(_queue), + tag(_tag), + token(_token), + id(_id), + acquired(_acquired), + pull(false), + cancelled(false), + credit(msg.payload ? msg.payload->getRequiredCredit() : 0), + size(msg.payload ? msg.payload->contentSize() : 0), + completed(false), + ended(accepted), + windowing(_windowing) { if (accepted) setEnded(); } -DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, - Queue::shared_ptr _queue, - const DeliveryId _id) : msg(_msg), - queue(_queue), - id(_id), - acquired(true), - pull(true), - cancelled(false), - credit(msg.payload ? msg.payload->getRequiredCredit() : 0), - size(msg.payload ? msg.payload->contentSize() : 0), - completed(false), - ended(false) -{} - void DeliveryRecord::setEnded() { ended = true; diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index b5149c5a00..8fdfaa71e6 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -53,12 +53,11 @@ class DeliveryRecord{ bool completed; bool ended; + const bool windowing; public: DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token, - const DeliveryId id, bool acquired, bool confirmed = false); - DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId id); - + const DeliveryId id, bool acquired, bool confirmed, bool windowing); bool matches(DeliveryId tag) const; bool matchOrAfter(DeliveryId tag) const; bool after(DeliveryId tag) const; @@ -77,7 +76,7 @@ class DeliveryRecord{ bool isAcquired() const { return acquired; } bool isComplete() const { return completed; } - bool isRedundant() const { return ended && completed; } + bool isRedundant() const { return ended && (!windowing || completed); } uint32_t getCredit() const; const std::string& getTag() const { return tag; } diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 6fb38eb674..26aea36b8a 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -265,7 +265,7 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg) DeliveryId deliveryTag = parent->deliveryAdapter.deliver(msg, token); if (windowing || ackExpected || !acquire) { - parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected)); + parent->record(DeliveryRecord(msg, queue, name, token, deliveryTag, acquire, !ackExpected, windowing)); } if (acquire && !ackExpected) { queue->dequeue(0, msg); @@ -444,20 +444,6 @@ void SemanticState::recover(bool requeue) } } -bool SemanticState::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected) -{ - QueuedMessage msg = queue->get(); - if(msg.payload){ - DeliveryId myDeliveryTag = deliveryAdapter.deliver(msg, token); - if(ackExpected){ - unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); - } - return true; - }else{ - return false; - } -} - DeliveryId SemanticState::redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token) { return deliveryAdapter.deliver(msg, token); diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 8207212fd9..4d23220692 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -179,7 +179,6 @@ class SemanticState : public sys::OutputTask, void flush(const std::string& destination); void stop(const std::string& destination); - bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected); void startTx(); void commit(MessageStore* const store); void rollback(); diff --git a/cpp/src/tests/DeliveryRecordTest.cpp b/cpp/src/tests/DeliveryRecordTest.cpp index 43161b4065..9bdc456993 100644 --- a/cpp/src/tests/DeliveryRecordTest.cpp +++ b/cpp/src/tests/DeliveryRecordTest.cpp @@ -45,7 +45,7 @@ QPID_AUTO_TEST_CASE(testSort) list<DeliveryRecord> records; for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) { - records.push_back(DeliveryRecord(QueuedMessage(0), Queue::shared_ptr(), "tag", DeliveryToken::shared_ptr(), *i, false, false)); + records.push_back(DeliveryRecord(QueuedMessage(0), Queue::shared_ptr(), "tag", DeliveryToken::shared_ptr(), *i, false, false, false)); } records.sort(); diff --git a/python/qpid/session.py b/python/qpid/session.py index 2f70461ab6..f500ab112f 100644 --- a/python/qpid/session.py +++ b/python/qpid/session.py @@ -344,6 +344,7 @@ class Incoming(Queue): self.destination = destination def start(self): + self.session.message_set_flow_mode(self.destination, self.session.flow_mode.credit) for unit in self.session.credit_unit.values(): self.session.message_flow(self.destination, unit, 0xFFFFFFFF) |
