summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/cluster/ExpiryPolicy.cpp52
-rw-r--r--cpp/src/qpid/cluster/ExpiryPolicy.h10
2 files changed, 51 insertions, 11 deletions
diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/cpp/src/qpid/cluster/ExpiryPolicy.cpp
index e1ba420f2a..d91437c5ba 100644
--- a/cpp/src/qpid/cluster/ExpiryPolicy.cpp
+++ b/cpp/src/qpid/cluster/ExpiryPolicy.cpp
@@ -41,40 +41,76 @@ struct ExpiryTask : public sys::TimerTask {
const uint64_t expiryId;
};
+// Called while receiving an update
+void ExpiryPolicy::setId(uint64_t id) {
+ sys::Mutex::ScopedLock l(lock);
+ expiryId = id;
+}
+
+// Called while giving an update
+uint64_t ExpiryPolicy::getId() const {
+ sys::Mutex::ScopedLock l(lock);
+ return expiryId;
+}
+
+// Called in enqueuing connection thread
void ExpiryPolicy::willExpire(broker::Message& m) {
- uint64_t id = expiryId++;
- assert(unexpiredById.find(id) == unexpiredById.end());
- assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
- unexpiredById[id] = &m;
- unexpiredByMessage[&m] = id;
+ uint64_t id;
+ {
+ // When messages are fanned out to multiple queues, update sends
+ // them as independenty messages so we can have multiple messages
+ // with the same expiry ID.
+ //
+ // TODO: fix update to avoid duplicating messages.
+ sys::Mutex::ScopedLock l(lock);
+ id = expiryId++; // if this is an update, this expiryId may already exist
+ assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
+ unexpiredById.insert(IdMessageMap::value_type(id, &m));
+ unexpiredByMessage[&m] = id;
+ }
timer.add(new ExpiryTask(this, id, m.getExpiration()));
}
+// Called in dequeueing connection thread
void ExpiryPolicy::forget(broker::Message& m) {
+ sys::Mutex::ScopedLock l(lock);
MessageIdMap::iterator i = unexpiredByMessage.find(&m);
assert(i != unexpiredByMessage.end());
unexpiredById.erase(i->second);
unexpiredByMessage.erase(i);
}
+// Called in dequeueing connection or cleanup thread.
bool ExpiryPolicy::hasExpired(broker::Message& m) {
+ sys::Mutex::ScopedLock l(lock);
return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
}
+// Called in timer thread
void ExpiryPolicy::sendExpire(uint64_t id) {
+ {
+ sys::Mutex::ScopedLock l(lock);
+ // Don't multicast an expiry notice if message is already forgotten.
+ if (unexpiredById.find(id) == unexpiredById.end()) return;
+ }
mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
}
+// Called in CPG deliver thread.
void ExpiryPolicy::deliverExpire(uint64_t id) {
- IdMessageMap::iterator i = unexpiredById.find(id);
- if (i != unexpiredById.end()) {
+ sys::Mutex::ScopedLock l(lock);
+ std::pair<IdMessageMap::iterator, IdMessageMap::iterator> expired = unexpiredById.equal_range(id);
+ IdMessageMap::iterator i = expired.first;
+ while (i != expired.second) {
i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true;
unexpiredByMessage.erase(i->second);
- unexpiredById.erase(i);
+ unexpiredById.erase(i++);
}
}
+// Called in update thread on the updater.
boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) {
+ sys::Mutex::ScopedLock l(lock);
MessageIdMap::iterator i = unexpiredByMessage.find(&m);
return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second;
}
diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.h b/cpp/src/qpid/cluster/ExpiryPolicy.h
index bdbe3a61dc..77a656aa68 100644
--- a/cpp/src/qpid/cluster/ExpiryPolicy.h
+++ b/cpp/src/qpid/cluster/ExpiryPolicy.h
@@ -61,20 +61,24 @@ class ExpiryPolicy : public broker::ExpiryPolicy
// Cluster delivers expiry notice.
void deliverExpire(uint64_t);
- void setId(uint64_t id) { expiryId = id; }
- uint64_t getId() const { return expiryId; }
+ void setId(uint64_t id);
+ uint64_t getId() const;
boost::optional<uint64_t> getId(broker::Message&);
private:
typedef std::map<broker::Message*, uint64_t> MessageIdMap;
- typedef std::map<uint64_t, broker::Message*> IdMessageMap;
+ // When messages are fanned out to multiple queues, update sends
+ // them as independenty messages so we can have multiple messages
+ // with the same expiry ID.
+ typedef std::multimap<uint64_t, broker::Message*> IdMessageMap;
struct Expired : public broker::ExpiryPolicy {
bool hasExpired(broker::Message&);
void willExpire(broker::Message&);
};
+ mutable sys::Mutex lock;
MessageIdMap unexpiredByMessage;
IdMessageMap unexpiredById;
uint64_t expiryId;