summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-02-09 22:25:26 +0000
committerAlan Conway <aconway@apache.org>2009-02-09 22:25:26 +0000
commit3a60db0672b78a75c52f39f5cefeeb00d3eeba97 (patch)
tree3f9c211e3649a3ef8a883e95d741387cf402dd17 /cpp/src/qpid/broker
parentc9a654925355a4dd128d5111af862e8be89e0a45 (diff)
downloadqpid-python-3a60db0672b78a75c52f39f5cefeeb00d3eeba97.tar.gz
Cluster support for message time-to-live.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@742774 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp2
-rw-r--r--cpp/src/qpid/broker/Broker.h13
-rw-r--r--cpp/src/qpid/broker/ExpiryPolicy.cpp36
-rw-r--r--cpp/src/qpid/broker/ExpiryPolicy.h44
-rw-r--r--cpp/src/qpid/broker/Message.cpp22
-rw-r--r--cpp/src/qpid/broker/Message.h9
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp11
7 files changed, 117 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 091f67ec58..95f55bb596 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -30,6 +30,7 @@
#include "SecureConnectionFactory.h"
#include "TopicExchange.h"
#include "Link.h"
+#include "ExpiryPolicy.h"
#include "qmf/org/apache/qpid/broker/Package.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h"
@@ -150,6 +151,7 @@ Broker::Broker(const Broker::Options& conf) :
queueCleaner(queues, timer),
queueEvents(poller),
recovery(true),
+ expiryPolicy(new ExpiryPolicy),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
if (conf.enableMgmt) {
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 71b69b51aa..a52a0f67e0 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -36,6 +36,7 @@
#include "Vhost.h"
#include "System.h"
#include "Timer.h"
+#include "ExpiryPolicy.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementBroker.h"
#include "qmf/org/apache/qpid/broker/Broker.h"
@@ -65,6 +66,8 @@ struct Url;
namespace broker {
+class ExpiryPolicy;
+
static const uint16_t DEFAULT_PORT=5672;
struct NoSuchTransportException : qpid::Exception
@@ -111,6 +114,8 @@ class Broker : public sys::Runnable, public Plugin::Target,
private:
typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap;
+ void declareStandardExchange(const std::string& name, const std::string& type);
+
boost::shared_ptr<sys::Poller> poller;
Options config;
management::ManagementAgent::Singleton managementAgentSingleton;
@@ -132,14 +137,11 @@ class Broker : public sys::Runnable, public Plugin::Target,
System::shared_ptr systemObject;
QueueCleaner queueCleaner;
QueueEvents queueEvents;
-
- void declareStandardExchange(const std::string& name, const std::string& type);
-
std::vector<Url> knownBrokers;
std::vector<Url> getKnownBrokersImpl();
std::string federationTag;
-
bool recovery;
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
public:
@@ -180,6 +182,9 @@ class Broker : public sys::Runnable, public Plugin::Target,
Options& getOptions() { return config; }
QueueEvents& getQueueEvents() { return queueEvents; }
+ void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
+ boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }
+
SessionManager& getSessionManager() { return sessionManager; }
const std::string& getFederationTag() const { return federationTag; }
diff --git a/cpp/src/qpid/broker/ExpiryPolicy.cpp b/cpp/src/qpid/broker/ExpiryPolicy.cpp
new file mode 100644
index 0000000000..907f1e56e1
--- /dev/null
+++ b/cpp/src/qpid/broker/ExpiryPolicy.cpp
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ExpiryPolicy.h"
+#include "Message.h"
+#include "qpid/sys/Time.h"
+
+namespace qpid {
+namespace broker {
+
+ExpiryPolicy::~ExpiryPolicy() {}
+
+void ExpiryPolicy::willExpire(Message&) {}
+
+bool ExpiryPolicy::hasExpired(Message& m) {
+ return m.getExpiration() < sys::AbsTime::now();
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/ExpiryPolicy.h b/cpp/src/qpid/broker/ExpiryPolicy.h
new file mode 100644
index 0000000000..1b7316f6f9
--- /dev/null
+++ b/cpp/src/qpid/broker/ExpiryPolicy.h
@@ -0,0 +1,44 @@
+#ifndef QPID_BROKER_EXPIRYPOLICY_H
+#define QPID_BROKER_EXPIRYPOLICY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/RefCounted.h"
+
+namespace qpid {
+namespace broker {
+
+class Message;
+
+/**
+ * Default expiry policy.
+ */
+class ExpiryPolicy : public RefCounted
+{
+ public:
+ virtual ~ExpiryPolicy();
+ virtual void willExpire(Message&);
+ virtual bool hasExpired(Message&);
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_EXPIRYPOLICY_H*/
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index e5a0c3e9e1..ce0477b08c 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -21,6 +21,7 @@
#include "Message.h"
#include "ExchangeRegistry.h"
+#include "ExpiryPolicy.h"
#include "qpid/StringUtils.h"
#include "qpid/framing/frame_functors.h"
#include "qpid/framing/FieldTable.h"
@@ -316,24 +317,29 @@ void Message::addTraceId(const std::string& id)
}
}
-void Message::setTimestamp()
+void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e)
{
DeliveryProperties* props = getProperties<DeliveryProperties>();
- //Spec states that timestamp should be set, evaluate the
- //performance impact before re-enabling this:
- //time_t now = ::time(0);
- //props->setTimestamp(now);
if (props->getTtl()) {
- //set expiration (nb: ttl is in millisecs, time_t is in secs)
+ // AMQP requires setting the expiration property to be posix
+ // time_t in seconds. TTL is in milliseconds
time_t now = ::time(0);
props->setExpiration(now + (props->getTtl()/1000));
+ // Use higher resolution time for the internal expiry calculation.
expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * TIME_MSEC));
+ setExpiryPolicy(e);
}
}
-bool Message::hasExpired() const
+void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) {
+ expiryPolicy = e;
+ if (expiryPolicy)
+ expiryPolicy->willExpire(*this);
+}
+
+bool Message::hasExpired()
{
- return expiration < FAR_FUTURE && expiration < AbsTime::now();
+ return expiryPolicy && expiryPolicy->hasExpired(*this);
}
boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index de716e9441..96fcf61dfc 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -45,6 +45,7 @@ class Exchange;
class ExchangeRegistry;
class MessageStore;
class Queue;
+class ExpiryPolicy;
class Message : public PersistableMessage {
public:
@@ -73,8 +74,11 @@ public:
const framing::FieldTable* getApplicationHeaders() const;
bool isPersistent();
bool requiresAccept();
- void setTimestamp();
- bool hasExpired() const;
+
+ void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e);
+ void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e);
+ bool hasExpired();
+ sys::AbsTime getExpiration() const { return expiration; }
framing::FrameSet& getFrames() { return frames; }
const framing::FrameSet& getFrames() const { return frames; }
@@ -171,6 +175,7 @@ public:
ConnectionToken* publisher;
mutable MessageAdapter* adapter;
qpid::sys::AbsTime expiration;
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
static TransferAdapter TRANSFER;
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index f9f75679e5..13a8c649d2 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -358,14 +358,13 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
std::string exchangeName = msg->getExchangeName();
//TODO: the following should be hidden behind message (using MessageAdapter or similar)
- // Do not replace the delivery-properties.exchange if it is is already set.
- // This is used internally (by the cluster) to force the exchange name on a message.
- // The client library ensures this is always empty for messages from normal clients.
if (msg->isA<MessageTransferBody>()) {
- if (!msg->hasProperties<DeliveryProperties>() ||
- msg->getProperties<DeliveryProperties>()->getExchange().empty())
+ // Do not replace the delivery-properties.exchange if it is is already set.
+ // This is used internally (by the cluster) to force the exchange name on a message.
+ // The client library ensures this is always empty for messages from normal clients.
+ if (!msg->hasProperties<DeliveryProperties>() || msg->getProperties<DeliveryProperties>()->getExchange().empty())
msg->getProperties<DeliveryProperties>()->setExchange(exchangeName);
- msg->setTimestamp();
+ msg->setTimestamp(getSession().getBroker().getExpiryPolicy());
}
if (!cacheExchange || cacheExchange->getName() != exchangeName){
cacheExchange = session.getBroker().getExchanges().get(exchangeName);