diff options
| author | Gordon Sim <gsim@apache.org> | 2007-10-05 17:28:48 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-10-05 17:28:48 +0000 |
| commit | 56f678f423f49dff9a79f3f6a0660f00e7da5cc3 (patch) | |
| tree | 9ce1540e75c51db731ba39266e927f4766107173 /cpp/src/qpid | |
| parent | 7defea948eda5d75e2cb665b2c46ab9aecbb73e0 (diff) | |
| download | qpid-python-56f678f423f49dff9a79f3f6a0660f00e7da5cc3.tar.gz | |
Don't recover messages for cancelled subscriptions.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@582353 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 23 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Message.h | 5 |
4 files changed, 30 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 62c2002801..55c8e214f4 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -39,18 +39,21 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, id(_id), acquired(_acquired), confirmed(_confirmed), - pull(false) + pull(false), + cancelled(false) { } DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, Queue::shared_ptr _queue, const DeliveryId _id) : msg(_msg), - queue(_queue), - id(_id), - acquired(true), - confirmed(false), - pull(true){} + queue(_queue), + id(_id), + acquired(true), + confirmed(false), + pull(true), + cancelled(false) +{} void DeliveryRecord::dequeue(TransactionContext* ctxt) const{ @@ -76,7 +79,7 @@ bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const } void DeliveryRecord::redeliver(SemanticState* const session) { - if (!confirmed) { + if (!confirmed && !cancelled) { if(pull){ //if message was originally sent as response to get, we must requeue it requeue(); @@ -149,6 +152,12 @@ void DeliveryRecord::acquire(std::vector<DeliveryId>& results) { } } +void DeliveryRecord::cancel(const std::string& cancelledTag) +{ + if (tag == cancelledTag) + cancelled = true; +} + namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index 7312cd15b0..52e16a0c06 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -49,6 +49,7 @@ class DeliveryRecord{ bool acquired; const bool confirmed; const bool pull; + bool cancelled; public: DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token, @@ -63,6 +64,7 @@ class DeliveryRecord{ void requeue() const; void release(); void reject(); + void cancel(const std::string& tag); void redeliver(SemanticState* const); void updateByteCredit(uint32_t& credit) const; void addTo(Prefetch&) const; diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index e435ed0522..cf16541949 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -95,8 +95,14 @@ void SemanticState::cancel(const string& tag){ // consumers is a ptr_map so erase will delete the consumer // which will call cancel. ConsumerImplMap::iterator i = consumers.find(tag); - if (i != consumers.end()) + if (i != consumers.end()) { consumers.erase(i); + //should cancel all unacked messages for this consumer so that + //they are not redelivered on recovery + Mutex::ScopedLock locker(deliveryLock); + for_each(unacked.begin(), unacked.end(), boost::bind(mem_fun_ref(&DeliveryRecord::cancel), _1, tag)); + + } } diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h index dab7f3c8d8..1fc199e8d8 100644 --- a/cpp/src/qpid/client/Message.h +++ b/cpp/src/qpid/client/Message.h @@ -75,6 +75,11 @@ public: return method; } + const framing::SequenceNumber& getId() const + { + return id; + } + private: //method and id are only set for received messages: const framing::MessageTransferBody method; |
