diff options
author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
commit | f83677056891e436bf5ba99e79240df2a44528cd (patch) | |
tree | 625bfd644b948e89105630759cf6decb0435354d /cpp/src/qpid/cluster/ExpiryPolicy.cpp | |
parent | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff) | |
download | qpid-python-QPID-2519.tar.gz |
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/ExpiryPolicy.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/ExpiryPolicy.cpp | 95 |
1 files changed, 5 insertions, 90 deletions
diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/cpp/src/qpid/cluster/ExpiryPolicy.cpp index d9a7b0122a..0ef5c2a35d 100644 --- a/cpp/src/qpid/cluster/ExpiryPolicy.cpp +++ b/cpp/src/qpid/cluster/ExpiryPolicy.cpp @@ -21,106 +21,21 @@ #include "qpid/broker/Message.h" #include "qpid/cluster/ExpiryPolicy.h" -#include "qpid/cluster/Multicaster.h" -#include "qpid/framing/ClusterMessageExpiredBody.h" +#include "qpid/cluster/Cluster.h" #include "qpid/sys/Time.h" -#include "qpid/sys/Timer.h" #include "qpid/log/Statement.h" namespace qpid { namespace cluster { -ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t) - : expiryId(1), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {} +ExpiryPolicy::ExpiryPolicy(Cluster& cluster) : cluster(cluster) {} -struct ExpiryTask : public sys::TimerTask { - ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when) - : TimerTask(when,"ExpiryPolicy"), expiryPolicy(policy), expiryId(id) {} - void fire() { expiryPolicy->sendExpire(expiryId); } - boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; - const uint64_t expiryId; -}; - -// Called while receiving an update -void ExpiryPolicy::setId(uint64_t id) { - sys::Mutex::ScopedLock l(lock); - expiryId = id; -} - -// Called while giving an update -uint64_t ExpiryPolicy::getId() const { - sys::Mutex::ScopedLock l(lock); - return expiryId; -} - -// Called in enqueuing connection thread -void ExpiryPolicy::willExpire(broker::Message& m) { - uint64_t id; - { - // When messages are fanned out to multiple queues, update sends - // them as independenty messages so we can have multiple messages - // with the same expiry ID. - // - sys::Mutex::ScopedLock l(lock); - id = expiryId++; - if (!id) { // This is an update of an already-expired message. - m.setExpiryPolicy(expiredPolicy); - } - else { - assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end()); - // If this is an update, the id may already exist - unexpiredById.insert(IdMessageMap::value_type(id, &m)); - unexpiredByMessage[&m] = id; - } - } - timer.add(new ExpiryTask(this, id, m.getExpiration())); -} - -// Called in dequeueing connection thread -void ExpiryPolicy::forget(broker::Message& m) { - sys::Mutex::ScopedLock l(lock); - MessageIdMap::iterator i = unexpiredByMessage.find(&m); - assert(i != unexpiredByMessage.end()); - unexpiredById.erase(i->second); - unexpiredByMessage.erase(i); -} - -// Called in dequeueing connection or cleanup thread. bool ExpiryPolicy::hasExpired(broker::Message& m) { - sys::Mutex::ScopedLock l(lock); - return unexpiredByMessage.find(&m) == unexpiredByMessage.end(); -} - -// Called in timer thread -void ExpiryPolicy::sendExpire(uint64_t id) { - { - sys::Mutex::ScopedLock l(lock); - // Don't multicast an expiry notice if message is already forgotten. - if (unexpiredById.find(id) == unexpiredById.end()) return; - } - mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId); + return m.getExpiration() < cluster.getClusterTime(); } -// Called in CPG deliver thread. -void ExpiryPolicy::deliverExpire(uint64_t id) { - sys::Mutex::ScopedLock l(lock); - std::pair<IdMessageMap::iterator, IdMessageMap::iterator> expired = unexpiredById.equal_range(id); - IdMessageMap::iterator i = expired.first; - while (i != expired.second) { - i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true; - unexpiredByMessage.erase(i->second); - unexpiredById.erase(i++); - } +sys::AbsTime ExpiryPolicy::getCurrentTime() { + return cluster.getClusterTime(); } -// Called in update thread on the updater. -boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) { - sys::Mutex::ScopedLock l(lock); - MessageIdMap::iterator i = unexpiredByMessage.find(&m); - return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second; -} - -bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; } -void ExpiryPolicy::Expired::willExpire(broker::Message&) { } - }} // namespace qpid::cluster |