From 3a60db0672b78a75c52f39f5cefeeb00d3eeba97 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 9 Feb 2009 22:25:26 +0000 Subject: Cluster support for message time-to-live. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@742774 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/cluster/Cluster.cpp | 31 ++++++++----- cpp/src/qpid/cluster/Cluster.h | 12 +++-- cpp/src/qpid/cluster/ClusterPlugin.cpp | 2 +- cpp/src/qpid/cluster/Connection.cpp | 8 +--- cpp/src/qpid/cluster/Connection.h | 2 +- cpp/src/qpid/cluster/ExpiryPolicy.cpp | 80 ++++++++++++++++++++++++++++++++++ cpp/src/qpid/cluster/ExpiryPolicy.h | 76 ++++++++++++++++++++++++++++++++ cpp/src/qpid/cluster/UpdateClient.cpp | 11 +++-- cpp/src/qpid/cluster/UpdateClient.h | 8 ++-- 9 files changed, 202 insertions(+), 28 deletions(-) create mode 100644 cpp/src/qpid/cluster/ExpiryPolicy.cpp create mode 100644 cpp/src/qpid/cluster/ExpiryPolicy.h (limited to 'cpp/src/qpid/cluster') diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 8e6ece11cc..be04eebc57 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -76,6 +76,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); } void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); } + void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } void shutdown() { cluster.shutdown(member, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } @@ -103,6 +104,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b poller), connections(*this), decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)), + expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())), + frameId(0), initialized(false), state(INIT), lastSize(0), @@ -134,6 +137,7 @@ void Cluster::initialize() { myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); + broker.setExpiryPolicy(expiryPolicy); dispatcher.start(); deliverEventQueue.start(); deliverFrameQueue.start(); @@ -238,7 +242,8 @@ void Cluster::deliveredEvent(const Event& e) { // Handler for deliverFrameQueue void Cluster::deliveredFrame(const EventFrame& e) { - Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + const_cast(e.frame).setClusterId(frameId++); QPID_LOG(trace, *this << " DLVR: " << e); QPID_LATENCY_RECORD("delivered frame queue", e.frame); if (e.isCluster()) { // Cluster control frame @@ -333,22 +338,23 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& state = JOINER; QPID_LOG(info, *this << " joining cluster: " << map); mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId); - ClusterMap::Set members = map.getAlive(); - members.erase(myId); - myElders = members; + elders = map.getAlive(); + elders.erase(myId); broker.getLinks().setPassive(true); } } else if (state >= READY && memberChange) { memberUpdate(l); - myElders = ClusterMap::intersection(myElders, map.getAlive()); - if (myElders.empty()) { + elders = ClusterMap::intersection(elders, map.getAlive()); + if (elders.empty()) { //assume we are oldest, reactive links if necessary broker.getLinks().setPassive(false); } } } +bool Cluster::isLeader() const { return elders.empty(); } + void Cluster::tryMakeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; @@ -420,15 +426,16 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) { deliverFrameQueue.stop(); if (updateThread.id()) updateThread.join(); // Join the previous updatethread. updateThread = Thread( - new UpdateClient(myId, updatee, url, broker, map, connections.values(), - boost::bind(&Cluster::updateOutDone, this), - boost::bind(&Cluster::updateOutError, this, _1))); + new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(), + boost::bind(&Cluster::updateOutDone, this), + boost::bind(&Cluster::updateOutError, this, _1))); } // Called in update thread. -void Cluster::updateInDone(const ClusterMap& m) { +void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) { Lock l(lock); updatedMap = m; + frameId = fid; checkUpdateIn(l); } @@ -573,4 +580,8 @@ void Cluster::setClusterId(const Uuid& uuid) { QPID_LOG(debug, *this << " cluster-id = " << clusterId); } +void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { + expiryPolicy->deliverExpire(id); +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index f7955aa743..4d994943f7 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -31,6 +31,7 @@ #include "Quorum.h" #include "Decoder.h" #include "PollableQueue.h" +#include "ExpiryPolicy.h" #include "qpid/broker/Broker.h" #include "qpid/sys/Monitor.h" @@ -89,7 +90,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void leave(); // Update completed - called in update thread - void updateInDone(const ClusterMap&); + void updateInDone(const ClusterMap&, uint64_t frameId); MemberId getId() const; broker::Broker& getBroker() const; @@ -100,6 +101,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { size_t getReadMax() { return readMax; } size_t getWriteEstimate() { return writeEstimate; } + + bool isLeader() const; // Called in deliver thread. private: typedef sys::Monitor::ScopedLock Lock; @@ -129,6 +132,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& addresses, Lock& l); + void messageExpired(const MemberId&, uint64_t, Lock& l); void shutdown(const MemberId&, Lock&); void deliveredEvent(const Event&); void deliveredFrame(const EventFrame&); @@ -185,7 +189,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { const size_t writeEstimate; framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; - ClusterMap::Set myElders; qpid::management::ManagementAgent* mAgent; // Thread safe members @@ -197,8 +200,11 @@ class Cluster : private Cpg::Handler, public management::Manageable { boost::shared_ptr failoverExchange; Quorum quorum; - // Called only from event delivery thread + // Used only in delivery thread Decoder decoder; + ClusterMap::Set elders; + boost::intrusive_ptr expiryPolicy; + uint64_t frameId; // Used only during initialization bool initialized; diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 2f7d12dcfe..d54d8389e0 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -54,7 +54,7 @@ struct ClusterValues { bool quorum; size_t readMax, writeEstimate; - ClusterValues() : quorum(false), readMax(3), writeEstimate(64) {} + ClusterValues() : quorum(false), readMax(10), writeEstimate(64) {} Url getUrl(uint16_t port) const { if (url.empty()) return Url::getIpAddressesUrl(port); diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 9ea79fa2b6..295705e967 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -127,10 +127,6 @@ bool Connection::checkUnsupported(const AMQBody& body) { case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break; } } - else if (body.type() == HEADER_BODY) { - const DeliveryProperties* dp = static_cast(body).get(); - if (dp && dp->getTtl()) message = "Message TTL is not currently supported by cluster."; - } if (!message.empty()) connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message); return !message.empty(); @@ -259,9 +255,9 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { self = shadow; } -void Connection::membership(const FieldTable& joiners, const FieldTable& members) { +void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); - cluster.updateInDone(ClusterMap(joiners, members)); + cluster.updateInDone(ClusterMap(joiners, members), frameId); self.second = 0; // Mark this as completed update connection. } diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 160855dc2d..a6e9aa65f9 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -119,7 +119,7 @@ class Connection : void shadowReady(uint64_t memberId, uint64_t connectionId); - void membership(const framing::FieldTable&, const framing::FieldTable&); + void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId); void deliveryRecord(const std::string& queue, const framing::SequenceNumber& position, diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/cpp/src/qpid/cluster/ExpiryPolicy.cpp new file mode 100644 index 0000000000..690acfc3ad --- /dev/null +++ b/cpp/src/qpid/cluster/ExpiryPolicy.cpp @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ExpiryPolicy.h" +#include "Multicaster.h" +#include "qpid/framing/ClusterMessageExpiredBody.h" +#include "qpid/sys/Time.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Timer.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace cluster { + +ExpiryPolicy::ExpiryPolicy(const boost::function & f, Multicaster& m, const MemberId& id, broker::Timer& t) + : expiredPolicy(new Expired), isLeader(f), mcast(m), memberId(id), timer(t) {} + +namespace { +uint64_t clusterId(const broker::Message& m) { + assert(m.getFrames().begin() != m.getFrames().end()); + return m.getFrames().begin()->getClusterId(); +} + +struct ExpiryTask : public broker::TimerTask { + ExpiryTask(const boost::intrusive_ptr& policy, uint64_t id, sys::AbsTime when) + : TimerTask(when), expiryPolicy(policy), messageId(id) {} + void fire() { expiryPolicy->sendExpire(messageId); } + boost::intrusive_ptr expiryPolicy; + const uint64_t messageId; +}; +} + +void ExpiryPolicy::willExpire(broker::Message& m) { + timer.add(new ExpiryTask(this, clusterId(m), m.getExpiration())); +} + +bool ExpiryPolicy::hasExpired(broker::Message& m) { + sys::Mutex::ScopedLock l(lock); + IdSet::iterator i = expired.find(clusterId(m)); + if (i != expired.end()) { + expired.erase(i); + const_cast(m).setExpiryPolicy(expiredPolicy); // hasExpired() == true; + return true; + } + return false; +} + +void ExpiryPolicy::sendExpire(uint64_t id) { + sys::Mutex::ScopedLock l(lock); + if (isLeader()) + mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId); +} + +void ExpiryPolicy::deliverExpire(uint64_t id) { + sys::Mutex::ScopedLock l(lock); + expired.insert(id); +} + +bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; } +void ExpiryPolicy::Expired::willExpire(broker::Message&) { } + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.h b/cpp/src/qpid/cluster/ExpiryPolicy.h new file mode 100644 index 0000000000..7fb63c731e --- /dev/null +++ b/cpp/src/qpid/cluster/ExpiryPolicy.h @@ -0,0 +1,76 @@ +#ifndef QPID_CLUSTER_EXPIRYPOLICY_H +#define QPID_CLUSTER_EXPIRYPOLICY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "qpid/broker/ExpiryPolicy.h" +#include "qpid/sys/Mutex.h" +#include +#include +#include + +namespace qpid { + +namespace broker { class Timer; } + +namespace cluster { +class Multicaster; + +/** + * Cluster expiry policy + */ +class ExpiryPolicy : public broker::ExpiryPolicy +{ + public: + ExpiryPolicy(const boost::function & isLeader, Multicaster&, const MemberId&, broker::Timer&); + + void willExpire(broker::Message&); + + bool hasExpired(broker::Message&); + + // Send expiration notice to cluster. + void sendExpire(uint64_t); + + // Cluster delivers expiry notice. + void deliverExpire(uint64_t); + + private: + sys::Mutex lock; + typedef std::set IdSet; + + struct Expired : public broker::ExpiryPolicy { + bool hasExpired(broker::Message&); + void willExpire(broker::Message&); + }; + + IdSet expired; + boost::intrusive_ptr expiredPolicy; + boost::function isLeader; + Multicaster& mcast; + MemberId memberId; + broker::Timer& timer; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_EXPIRYPOLICY_H*/ diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 91d4c6d3ce..e50c936b50 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -86,10 +86,12 @@ void send(client::AsyncSession& s, const AMQBody& body) { // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, - broker::Broker& broker, const ClusterMap& m, const Cluster::Connections& cons, - const boost::function& ok, - const boost::function& fail) - : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), connections(cons), + broker::Broker& broker, const ClusterMap& m, uint64_t frameId_, + const Cluster::Connections& cons, + const boost::function& ok, + const boost::function& fail) + : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), + frameId(frameId_), connections(cons), connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail) { @@ -120,6 +122,7 @@ void UpdateClient::update() { ClusterConnectionMembershipBody membership; map.toMethodBody(membership); + membership.setFrameId(frameId); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->handle(frame); connection.close(); diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h index 93dca9f0c6..0819eb4cdb 100644 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -63,9 +63,10 @@ class UpdateClient : public sys::Runnable { static const std::string UPDATE; // Name for special update queue and exchange. UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&, - broker::Broker& donor, const ClusterMap& map, const std::vector >& , - const boost::function& done, - const boost::function& fail); + broker::Broker& donor, const ClusterMap& map, uint64_t sequence, + const std::vector >& , + const boost::function& done, + const boost::function& fail); ~UpdateClient(); void update(); @@ -89,6 +90,7 @@ class UpdateClient : public sys::Runnable { Url updateeUrl; broker::Broker& updaterBroker; ClusterMap map; + uint64_t frameId; std::vector > connections; client::Connection connection, shadowConnection; client::AsyncSession session, shadowSession; -- cgit v1.2.1