diff options
| author | Alan Conway <aconway@apache.org> | 2009-02-09 22:25:26 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2009-02-09 22:25:26 +0000 |
| commit | 3a60db0672b78a75c52f39f5cefeeb00d3eeba97 (patch) | |
| tree | 3f9c211e3649a3ef8a883e95d741387cf402dd17 /cpp/src/qpid/broker/Message.cpp | |
| parent | c9a654925355a4dd128d5111af862e8be89e0a45 (diff) | |
| download | qpid-python-3a60db0672b78a75c52f39f5cefeeb00d3eeba97.tar.gz | |
Cluster support for message time-to-live.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@742774 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 22 |
1 files changed, 14 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index e5a0c3e9e1..ce0477b08c 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -21,6 +21,7 @@ #include "Message.h" #include "ExchangeRegistry.h" +#include "ExpiryPolicy.h" #include "qpid/StringUtils.h" #include "qpid/framing/frame_functors.h" #include "qpid/framing/FieldTable.h" @@ -316,24 +317,29 @@ void Message::addTraceId(const std::string& id) } } -void Message::setTimestamp() +void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) { DeliveryProperties* props = getProperties<DeliveryProperties>(); - //Spec states that timestamp should be set, evaluate the - //performance impact before re-enabling this: - //time_t now = ::time(0); - //props->setTimestamp(now); if (props->getTtl()) { - //set expiration (nb: ttl is in millisecs, time_t is in secs) + // AMQP requires setting the expiration property to be posix + // time_t in seconds. TTL is in milliseconds time_t now = ::time(0); props->setExpiration(now + (props->getTtl()/1000)); + // Use higher resolution time for the internal expiry calculation. expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * TIME_MSEC)); + setExpiryPolicy(e); } } -bool Message::hasExpired() const +void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { + expiryPolicy = e; + if (expiryPolicy) + expiryPolicy->willExpire(*this); +} + +bool Message::hasExpired() { - return expiration < FAR_FUTURE && expiration < AbsTime::now(); + return expiryPolicy && expiryPolicy->hasExpired(*this); } boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const |
