summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp23
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h2
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp8
-rw-r--r--cpp/src/qpid/client/Message.h5
4 files changed, 30 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 62c2002801..55c8e214f4 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -39,18 +39,21 @@ DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
id(_id),
acquired(_acquired),
confirmed(_confirmed),
- pull(false)
+ pull(false),
+ cancelled(false)
{
}
DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
Queue::shared_ptr _queue,
const DeliveryId _id) : msg(_msg),
- queue(_queue),
- id(_id),
- acquired(true),
- confirmed(false),
- pull(true){}
+ queue(_queue),
+ id(_id),
+ acquired(true),
+ confirmed(false),
+ pull(true),
+ cancelled(false)
+{}
void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
@@ -76,7 +79,7 @@ bool DeliveryRecord::coveredBy(const framing::AccumulatedAck* const range) const
}
void DeliveryRecord::redeliver(SemanticState* const session) {
- if (!confirmed) {
+ if (!confirmed && !cancelled) {
if(pull){
//if message was originally sent as response to get, we must requeue it
requeue();
@@ -149,6 +152,12 @@ void DeliveryRecord::acquire(std::vector<DeliveryId>& results) {
}
}
+void DeliveryRecord::cancel(const std::string& cancelledTag)
+{
+ if (tag == cancelledTag)
+ cancelled = true;
+}
+
namespace qpid {
namespace broker {
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index 7312cd15b0..52e16a0c06 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -49,6 +49,7 @@ class DeliveryRecord{
bool acquired;
const bool confirmed;
const bool pull;
+ bool cancelled;
public:
DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const std::string tag, DeliveryToken::shared_ptr token,
@@ -63,6 +64,7 @@ class DeliveryRecord{
void requeue() const;
void release();
void reject();
+ void cancel(const std::string& tag);
void redeliver(SemanticState* const);
void updateByteCredit(uint32_t& credit) const;
void addTo(Prefetch&) const;
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index e435ed0522..cf16541949 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -95,8 +95,14 @@ void SemanticState::cancel(const string& tag){
// consumers is a ptr_map so erase will delete the consumer
// which will call cancel.
ConsumerImplMap::iterator i = consumers.find(tag);
- if (i != consumers.end())
+ if (i != consumers.end()) {
consumers.erase(i);
+ //should cancel all unacked messages for this consumer so that
+ //they are not redelivered on recovery
+ Mutex::ScopedLock locker(deliveryLock);
+ for_each(unacked.begin(), unacked.end(), boost::bind(mem_fun_ref(&DeliveryRecord::cancel), _1, tag));
+
+ }
}
diff --git a/cpp/src/qpid/client/Message.h b/cpp/src/qpid/client/Message.h
index dab7f3c8d8..1fc199e8d8 100644
--- a/cpp/src/qpid/client/Message.h
+++ b/cpp/src/qpid/client/Message.h
@@ -75,6 +75,11 @@ public:
return method;
}
+ const framing::SequenceNumber& getId() const
+ {
+ return id;
+ }
+
private:
//method and id are only set for received messages:
const framing::MessageTransferBody method;