summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/ExpiryPolicy.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-03-09 17:03:40 +0000
committerAlan Conway <aconway@apache.org>2009-03-09 17:03:40 +0000
commitfaae42761fe83f083d408d2b4f9e95b7c619122e (patch)
tree21013d3445ce52650c9c1a2e8859dd0d931b0f09 /cpp/src/qpid/cluster/ExpiryPolicy.cpp
parentbbbd455d8053d3fea713d8f04b0187ac73c72d83 (diff)
downloadqpid-python-faae42761fe83f083d408d2b4f9e95b7c619122e.tar.gz
Fix cluster TTL: replicte expiry information to newcomers.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@751760 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/ExpiryPolicy.cpp')
-rw-r--r--cpp/src/qpid/cluster/ExpiryPolicy.cpp45
1 files changed, 22 insertions, 23 deletions
diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/cpp/src/qpid/cluster/ExpiryPolicy.cpp
index cc451bf661..409180c499 100644
--- a/cpp/src/qpid/cluster/ExpiryPolicy.cpp
+++ b/cpp/src/qpid/cluster/ExpiryPolicy.cpp
@@ -31,46 +31,45 @@ namespace qpid {
namespace cluster {
ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, broker::Timer& t)
- : expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
-
-namespace {
-uint64_t clusterId(const broker::Message& m) {
- assert(m.getFrames().begin() != m.getFrames().end());
- return m.getFrames().begin()->getClusterId();
-}
+ : expiryId(0), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
struct ExpiryTask : public broker::TimerTask {
ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
- : TimerTask(when), expiryPolicy(policy), messageId(id) {}
- void fire() { expiryPolicy->sendExpire(messageId); }
+ : TimerTask(when), expiryPolicy(policy), expiryId(id) {}
+ void fire() { expiryPolicy->sendExpire(expiryId); }
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
- const uint64_t messageId;
+ const uint64_t expiryId;
};
-}
void ExpiryPolicy::willExpire(broker::Message& m) {
- timer.add(new ExpiryTask(this, clusterId(m), m.getExpiration()));
+ uint64_t id = expiryId++;
+ assert(unexpiredById.find(id) == unexpiredById.end());
+ assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
+ unexpiredById[id] = &m;
+ unexpiredByMessage[&m] = id;
+ timer.add(new ExpiryTask(this, id, m.getExpiration()));
}
bool ExpiryPolicy::hasExpired(broker::Message& m) {
- sys::Mutex::ScopedLock l(lock);
- IdSet::iterator i = expired.find(clusterId(m));
- if (i != expired.end()) {
- expired.erase(i);
- const_cast<broker::Message&>(m).setExpiryPolicy(expiredPolicy); // hasExpired() == true;
- return true;
- }
- return false;
+ return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
}
void ExpiryPolicy::sendExpire(uint64_t id) {
- sys::Mutex::ScopedLock l(lock);
mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
}
void ExpiryPolicy::deliverExpire(uint64_t id) {
- sys::Mutex::ScopedLock l(lock);
- expired.insert(id);
+ IdMessageMap::iterator i = unexpiredById.find(id);
+ if (i != unexpiredById.end()) {
+ i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true;
+ unexpiredByMessage.erase(i->second);
+ unexpiredById.erase(i);
+ }
+}
+
+boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) {
+ MessageIdMap::iterator i = unexpiredByMessage.find(&m);
+ return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second;
}
bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; }