summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Message.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-02-09 22:25:26 +0000
committerAlan Conway <aconway@apache.org>2009-02-09 22:25:26 +0000
commit3a60db0672b78a75c52f39f5cefeeb00d3eeba97 (patch)
tree3f9c211e3649a3ef8a883e95d741387cf402dd17 /cpp/src/qpid/broker/Message.cpp
parentc9a654925355a4dd128d5111af862e8be89e0a45 (diff)
downloadqpid-python-3a60db0672b78a75c52f39f5cefeeb00d3eeba97.tar.gz
Cluster support for message time-to-live.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@742774 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Message.cpp')
-rw-r--r--cpp/src/qpid/broker/Message.cpp22
1 files changed, 14 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index e5a0c3e9e1..ce0477b08c 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -21,6 +21,7 @@
#include "Message.h"
#include "ExchangeRegistry.h"
+#include "ExpiryPolicy.h"
#include "qpid/StringUtils.h"
#include "qpid/framing/frame_functors.h"
#include "qpid/framing/FieldTable.h"
@@ -316,24 +317,29 @@ void Message::addTraceId(const std::string& id)
}
}
-void Message::setTimestamp()
+void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
{
DeliveryProperties* props = getProperties<DeliveryProperties>();
- //Spec states that timestamp should be set, evaluate the
- //performance impact before re-enabling this:
- //time_t now = ::time(0);
- //props->setTimestamp(now);
if (props->getTtl()) {
- //set expiration (nb: ttl is in millisecs, time_t is in secs)
+ // AMQP requires setting the expiration property to be posix
+ // time_t in seconds. TTL is in milliseconds
time_t now = ::time(0);
props->setExpiration(now + (props->getTtl()/1000));
+ // Use higher resolution time for the internal expiry calculation.
expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * TIME_MSEC));
+ setExpiryPolicy(e);
}
}
-bool Message::hasExpired() const
+void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
+ expiryPolicy = e;
+ if (expiryPolicy)
+ expiryPolicy->willExpire(*this);
+}
+
+bool Message::hasExpired()
{
- return expiration < FAR_FUTURE && expiration < AbsTime::now();
+ return expiryPolicy && expiryPolicy->hasExpired(*this);
}
boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const