diff options
| author | Alan Conway <aconway@apache.org> | 2009-02-09 22:25:26 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2009-02-09 22:25:26 +0000 |
| commit | 3a60db0672b78a75c52f39f5cefeeb00d3eeba97 (patch) | |
| tree | 3f9c211e3649a3ef8a883e95d741387cf402dd17 /cpp/src/qpid/broker | |
| parent | c9a654925355a4dd128d5111af862e8be89e0a45 (diff) | |
| download | qpid-python-3a60db0672b78a75c52f39f5cefeeb00d3eeba97.tar.gz | |
Cluster support for message time-to-live.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@742774 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 13 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ExpiryPolicy.cpp | 36 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ExpiryPolicy.h | 44 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 22 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Message.h | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 11 |
7 files changed, 117 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 091f67ec58..95f55bb596 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -30,6 +30,7 @@ #include "SecureConnectionFactory.h" #include "TopicExchange.h" #include "Link.h" +#include "ExpiryPolicy.h" #include "qmf/org/apache/qpid/broker/Package.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerEcho.h" @@ -150,6 +151,7 @@ Broker::Broker(const Broker::Options& conf) : queueCleaner(queues, timer), queueEvents(poller), recovery(true), + expiryPolicy(new ExpiryPolicy), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { if (conf.enableMgmt) { diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 71b69b51aa..a52a0f67e0 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -36,6 +36,7 @@ #include "Vhost.h" #include "System.h" #include "Timer.h" +#include "ExpiryPolicy.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementBroker.h" #include "qmf/org/apache/qpid/broker/Broker.h" @@ -65,6 +66,8 @@ struct Url; namespace broker { +class ExpiryPolicy; + static const uint16_t DEFAULT_PORT=5672; struct NoSuchTransportException : qpid::Exception @@ -111,6 +114,8 @@ class Broker : public sys::Runnable, public Plugin::Target, private: typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap; + void declareStandardExchange(const std::string& name, const std::string& type); + boost::shared_ptr<sys::Poller> poller; Options config; management::ManagementAgent::Singleton managementAgentSingleton; @@ -132,14 +137,11 @@ class Broker : public sys::Runnable, public Plugin::Target, System::shared_ptr systemObject; QueueCleaner queueCleaner; QueueEvents queueEvents; - - void declareStandardExchange(const std::string& name, const std::string& type); - std::vector<Url> knownBrokers; std::vector<Url> getKnownBrokersImpl(); std::string federationTag; - bool recovery; + boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; public: @@ -180,6 +182,9 @@ class Broker : public sys::Runnable, public Plugin::Target, Options& getOptions() { return config; } QueueEvents& getQueueEvents() { return queueEvents; } + void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; } + boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; } + SessionManager& getSessionManager() { return sessionManager; } const std::string& getFederationTag() const { return federationTag; } diff --git a/cpp/src/qpid/broker/ExpiryPolicy.cpp b/cpp/src/qpid/broker/ExpiryPolicy.cpp new file mode 100644 index 0000000000..907f1e56e1 --- /dev/null +++ b/cpp/src/qpid/broker/ExpiryPolicy.cpp @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ExpiryPolicy.h" +#include "Message.h" +#include "qpid/sys/Time.h" + +namespace qpid { +namespace broker { + +ExpiryPolicy::~ExpiryPolicy() {} + +void ExpiryPolicy::willExpire(Message&) {} + +bool ExpiryPolicy::hasExpired(Message& m) { + return m.getExpiration() < sys::AbsTime::now(); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/ExpiryPolicy.h b/cpp/src/qpid/broker/ExpiryPolicy.h new file mode 100644 index 0000000000..1b7316f6f9 --- /dev/null +++ b/cpp/src/qpid/broker/ExpiryPolicy.h @@ -0,0 +1,44 @@ +#ifndef QPID_BROKER_EXPIRYPOLICY_H +#define QPID_BROKER_EXPIRYPOLICY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/RefCounted.h" + +namespace qpid { +namespace broker { + +class Message; + +/** + * Default expiry policy. + */ +class ExpiryPolicy : public RefCounted +{ + public: + virtual ~ExpiryPolicy(); + virtual void willExpire(Message&); + virtual bool hasExpired(Message&); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_EXPIRYPOLICY_H*/ diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index e5a0c3e9e1..ce0477b08c 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -21,6 +21,7 @@ #include "Message.h" #include "ExchangeRegistry.h" +#include "ExpiryPolicy.h" #include "qpid/StringUtils.h" #include "qpid/framing/frame_functors.h" #include "qpid/framing/FieldTable.h" @@ -316,24 +317,29 @@ void Message::addTraceId(const std::string& id) } } -void Message::setTimestamp() +void Message::setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e) { DeliveryProperties* props = getProperties<DeliveryProperties>(); - //Spec states that timestamp should be set, evaluate the - //performance impact before re-enabling this: - //time_t now = ::time(0); - //props->setTimestamp(now); if (props->getTtl()) { - //set expiration (nb: ttl is in millisecs, time_t is in secs) + // AMQP requires setting the expiration property to be posix + // time_t in seconds. TTL is in milliseconds time_t now = ::time(0); props->setExpiration(now + (props->getTtl()/1000)); + // Use higher resolution time for the internal expiry calculation. expiration = AbsTime(AbsTime::now(), Duration(props->getTtl() * TIME_MSEC)); + setExpiryPolicy(e); } } -bool Message::hasExpired() const +void Message::setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { + expiryPolicy = e; + if (expiryPolicy) + expiryPolicy->willExpire(*this); +} + +bool Message::hasExpired() { - return expiration < FAR_FUTURE && expiration < AbsTime::now(); + return expiryPolicy && expiryPolicy->hasExpired(*this); } boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* qfor) const diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index de716e9441..96fcf61dfc 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -45,6 +45,7 @@ class Exchange; class ExchangeRegistry; class MessageStore; class Queue; +class ExpiryPolicy; class Message : public PersistableMessage { public: @@ -73,8 +74,11 @@ public: const framing::FieldTable* getApplicationHeaders() const; bool isPersistent(); bool requiresAccept(); - void setTimestamp(); - bool hasExpired() const; + + void setTimestamp(const boost::intrusive_ptr<ExpiryPolicy>& e); + void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e); + bool hasExpired(); + sys::AbsTime getExpiration() const { return expiration; } framing::FrameSet& getFrames() { return frames; } const framing::FrameSet& getFrames() const { return frames; } @@ -171,6 +175,7 @@ public: ConnectionToken* publisher; mutable MessageAdapter* adapter; qpid::sys::AbsTime expiration; + boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; static TransferAdapter TRANSFER; diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index f9f75679e5..13a8c649d2 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -358,14 +358,13 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { std::string exchangeName = msg->getExchangeName(); //TODO: the following should be hidden behind message (using MessageAdapter or similar) - // Do not replace the delivery-properties.exchange if it is is already set. - // This is used internally (by the cluster) to force the exchange name on a message. - // The client library ensures this is always empty for messages from normal clients. if (msg->isA<MessageTransferBody>()) { - if (!msg->hasProperties<DeliveryProperties>() || - msg->getProperties<DeliveryProperties>()->getExchange().empty()) + // Do not replace the delivery-properties.exchange if it is is already set. + // This is used internally (by the cluster) to force the exchange name on a message. + // The client library ensures this is always empty for messages from normal clients. + if (!msg->hasProperties<DeliveryProperties>() || msg->getProperties<DeliveryProperties>()->getExchange().empty()) msg->getProperties<DeliveryProperties>()->setExchange(exchangeName); - msg->setTimestamp(); + msg->setTimestamp(getSession().getBroker().getExpiryPolicy()); } if (!cacheExchange || cacheExchange->getName() != exchangeName){ cacheExchange = session.getBroker().getExchanges().get(exchangeName); |
