summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-02-09 22:25:26 +0000
committerAlan Conway <aconway@apache.org>2009-02-09 22:25:26 +0000
commit3a60db0672b78a75c52f39f5cefeeb00d3eeba97 (patch)
tree3f9c211e3649a3ef8a883e95d741387cf402dd17 /cpp/src/qpid/cluster
parentc9a654925355a4dd128d5111af862e8be89e0a45 (diff)
downloadqpid-python-3a60db0672b78a75c52f39f5cefeeb00d3eeba97.tar.gz
Cluster support for message time-to-live.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@742774 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp31
-rw-r--r--cpp/src/qpid/cluster/Cluster.h12
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp2
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp8
-rw-r--r--cpp/src/qpid/cluster/Connection.h2
-rw-r--r--cpp/src/qpid/cluster/ExpiryPolicy.cpp80
-rw-r--r--cpp/src/qpid/cluster/ExpiryPolicy.h76
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp11
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.h8
9 files changed, 202 insertions, 28 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 8e6ece11cc..be04eebc57 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -76,6 +76,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); }
void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
+ void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
void shutdown() { cluster.shutdown(member, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -103,6 +104,8 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
poller),
connections(*this),
decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)),
+ expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())),
+ frameId(0),
initialized(false),
state(INIT),
lastSize(0),
@@ -134,6 +137,7 @@ void Cluster::initialize() {
myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT));
QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
+ broker.setExpiryPolicy(expiryPolicy);
dispatcher.start();
deliverEventQueue.start();
deliverFrameQueue.start();
@@ -238,7 +242,8 @@ void Cluster::deliveredEvent(const Event& e) {
// Handler for deliverFrameQueue
void Cluster::deliveredFrame(const EventFrame& e) {
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
+ const_cast<AMQFrame&>(e.frame).setClusterId(frameId++);
QPID_LOG(trace, *this << " DLVR: " << e);
QPID_LATENCY_RECORD("delivered frame queue", e.frame);
if (e.isCluster()) { // Cluster control frame
@@ -333,22 +338,23 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
state = JOINER;
QPID_LOG(info, *this << " joining cluster: " << map);
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId);
- ClusterMap::Set members = map.getAlive();
- members.erase(myId);
- myElders = members;
+ elders = map.getAlive();
+ elders.erase(myId);
broker.getLinks().setPassive(true);
}
}
else if (state >= READY && memberChange) {
memberUpdate(l);
- myElders = ClusterMap::intersection(myElders, map.getAlive());
- if (myElders.empty()) {
+ elders = ClusterMap::intersection(elders, map.getAlive());
+ if (elders.empty()) {
//assume we are oldest, reactive links if necessary
broker.getLinks().setPassive(false);
}
}
}
+bool Cluster::isLeader() const { return elders.empty(); }
+
void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isJoiner(id)) {
state = OFFER;
@@ -420,15 +426,16 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) {
deliverFrameQueue.stop();
if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
updateThread = Thread(
- new UpdateClient(myId, updatee, url, broker, map, connections.values(),
- boost::bind(&Cluster::updateOutDone, this),
- boost::bind(&Cluster::updateOutError, this, _1)));
+ new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(),
+ boost::bind(&Cluster::updateOutDone, this),
+ boost::bind(&Cluster::updateOutError, this, _1)));
}
// Called in update thread.
-void Cluster::updateInDone(const ClusterMap& m) {
+void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) {
Lock l(lock);
updatedMap = m;
+ frameId = fid;
checkUpdateIn(l);
}
@@ -573,4 +580,8 @@ void Cluster::setClusterId(const Uuid& uuid) {
QPID_LOG(debug, *this << " cluster-id = " << clusterId);
}
+void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
+ expiryPolicy->deliverExpire(id);
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index f7955aa743..4d994943f7 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -31,6 +31,7 @@
#include "Quorum.h"
#include "Decoder.h"
#include "PollableQueue.h"
+#include "ExpiryPolicy.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/Monitor.h"
@@ -89,7 +90,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void leave();
// Update completed - called in update thread
- void updateInDone(const ClusterMap&);
+ void updateInDone(const ClusterMap&, uint64_t frameId);
MemberId getId() const;
broker::Broker& getBroker() const;
@@ -100,6 +101,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
size_t getReadMax() { return readMax; }
size_t getWriteEstimate() { return writeEstimate; }
+
+ bool isLeader() const; // Called in deliver thread.
private:
typedef sys::Monitor::ScopedLock Lock;
@@ -129,6 +132,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& addresses, Lock& l);
+ void messageExpired(const MemberId&, uint64_t, Lock& l);
void shutdown(const MemberId&, Lock&);
void deliveredEvent(const Event&);
void deliveredFrame(const EventFrame&);
@@ -185,7 +189,6 @@ class Cluster : private Cpg::Handler, public management::Manageable {
const size_t writeEstimate;
framing::Uuid clusterId;
NoOpConnectionOutputHandler shadowOut;
- ClusterMap::Set myElders;
qpid::management::ManagementAgent* mAgent;
// Thread safe members
@@ -197,8 +200,11 @@ class Cluster : private Cpg::Handler, public management::Manageable {
boost::shared_ptr<FailoverExchange> failoverExchange;
Quorum quorum;
- // Called only from event delivery thread
+ // Used only in delivery thread
Decoder decoder;
+ ClusterMap::Set elders;
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+ uint64_t frameId;
// Used only during initialization
bool initialized;
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 2f7d12dcfe..d54d8389e0 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -54,7 +54,7 @@ struct ClusterValues {
bool quorum;
size_t readMax, writeEstimate;
- ClusterValues() : quorum(false), readMax(3), writeEstimate(64) {}
+ ClusterValues() : quorum(false), readMax(10), writeEstimate(64) {}
Url getUrl(uint16_t port) const {
if (url.empty()) return Url::getIpAddressesUrl(port);
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 9ea79fa2b6..295705e967 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -127,10 +127,6 @@ bool Connection::checkUnsupported(const AMQBody& body) {
case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break;
}
}
- else if (body.type() == HEADER_BODY) {
- const DeliveryProperties* dp = static_cast<const AMQHeaderBody&>(body).get<DeliveryProperties>();
- if (dp && dp->getTtl()) message = "Message TTL is not currently supported by cluster.";
- }
if (!message.empty())
connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message);
return !message.empty();
@@ -259,9 +255,9 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
self = shadow;
}
-void Connection::membership(const FieldTable& joiners, const FieldTable& members) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) {
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
- cluster.updateInDone(ClusterMap(joiners, members));
+ cluster.updateInDone(ClusterMap(joiners, members), frameId);
self.second = 0; // Mark this as completed update connection.
}
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 160855dc2d..a6e9aa65f9 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -119,7 +119,7 @@ class Connection :
void shadowReady(uint64_t memberId, uint64_t connectionId);
- void membership(const framing::FieldTable&, const framing::FieldTable&);
+ void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId);
void deliveryRecord(const std::string& queue,
const framing::SequenceNumber& position,
diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/cpp/src/qpid/cluster/ExpiryPolicy.cpp
new file mode 100644
index 0000000000..690acfc3ad
--- /dev/null
+++ b/cpp/src/qpid/cluster/ExpiryPolicy.cpp
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ExpiryPolicy.h"
+#include "Multicaster.h"
+#include "qpid/framing/ClusterMessageExpiredBody.h"
+#include "qpid/sys/Time.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Timer.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+ExpiryPolicy::ExpiryPolicy(const boost::function<bool()> & f, Multicaster& m, const MemberId& id, broker::Timer& t)
+ : expiredPolicy(new Expired), isLeader(f), mcast(m), memberId(id), timer(t) {}
+
+namespace {
+uint64_t clusterId(const broker::Message& m) {
+ assert(m.getFrames().begin() != m.getFrames().end());
+ return m.getFrames().begin()->getClusterId();
+}
+
+struct ExpiryTask : public broker::TimerTask {
+ ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
+ : TimerTask(when), expiryPolicy(policy), messageId(id) {}
+ void fire() { expiryPolicy->sendExpire(messageId); }
+ boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+ const uint64_t messageId;
+};
+}
+
+void ExpiryPolicy::willExpire(broker::Message& m) {
+ timer.add(new ExpiryTask(this, clusterId(m), m.getExpiration()));
+}
+
+bool ExpiryPolicy::hasExpired(broker::Message& m) {
+ sys::Mutex::ScopedLock l(lock);
+ IdSet::iterator i = expired.find(clusterId(m));
+ if (i != expired.end()) {
+ expired.erase(i);
+ const_cast<broker::Message&>(m).setExpiryPolicy(expiredPolicy); // hasExpired() == true;
+ return true;
+ }
+ return false;
+}
+
+void ExpiryPolicy::sendExpire(uint64_t id) {
+ sys::Mutex::ScopedLock l(lock);
+ if (isLeader())
+ mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
+}
+
+void ExpiryPolicy::deliverExpire(uint64_t id) {
+ sys::Mutex::ScopedLock l(lock);
+ expired.insert(id);
+}
+
+bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; }
+void ExpiryPolicy::Expired::willExpire(broker::Message&) { }
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.h b/cpp/src/qpid/cluster/ExpiryPolicy.h
new file mode 100644
index 0000000000..7fb63c731e
--- /dev/null
+++ b/cpp/src/qpid/cluster/ExpiryPolicy.h
@@ -0,0 +1,76 @@
+#ifndef QPID_CLUSTER_EXPIRYPOLICY_H
+#define QPID_CLUSTER_EXPIRYPOLICY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/function.hpp>
+#include <boost/intrusive_ptr.hpp>
+#include <set>
+
+namespace qpid {
+
+namespace broker { class Timer; }
+
+namespace cluster {
+class Multicaster;
+
+/**
+ * Cluster expiry policy
+ */
+class ExpiryPolicy : public broker::ExpiryPolicy
+{
+ public:
+ ExpiryPolicy(const boost::function<bool()> & isLeader, Multicaster&, const MemberId&, broker::Timer&);
+
+ void willExpire(broker::Message&);
+
+ bool hasExpired(broker::Message&);
+
+ // Send expiration notice to cluster.
+ void sendExpire(uint64_t);
+
+ // Cluster delivers expiry notice.
+ void deliverExpire(uint64_t);
+
+ private:
+ sys::Mutex lock;
+ typedef std::set<uint64_t> IdSet;
+
+ struct Expired : public broker::ExpiryPolicy {
+ bool hasExpired(broker::Message&);
+ void willExpire(broker::Message&);
+ };
+
+ IdSet expired;
+ boost::intrusive_ptr<Expired> expiredPolicy;
+ boost::function<bool()> isLeader;
+ Multicaster& mcast;
+ MemberId memberId;
+ broker::Timer& timer;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_EXPIRYPOLICY_H*/
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 91d4c6d3ce..e50c936b50 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -86,10 +86,12 @@ void send(client::AsyncSession& s, const AMQBody& body) {
// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
- broker::Broker& broker, const ClusterMap& m, const Cluster::Connections& cons,
- const boost::function<void()>& ok,
- const boost::function<void(const std::exception&)>& fail)
- : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), connections(cons),
+ broker::Broker& broker, const ClusterMap& m, uint64_t frameId_,
+ const Cluster::Connections& cons,
+ const boost::function<void()>& ok,
+ const boost::function<void(const std::exception&)>& fail)
+ : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m),
+ frameId(frameId_), connections(cons),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail)
{
@@ -120,6 +122,7 @@ void UpdateClient::update() {
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
+ membership.setFrameId(frameId);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->handle(frame);
connection.close();
diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h
index 93dca9f0c6..0819eb4cdb 100644
--- a/cpp/src/qpid/cluster/UpdateClient.h
+++ b/cpp/src/qpid/cluster/UpdateClient.h
@@ -63,9 +63,10 @@ class UpdateClient : public sys::Runnable {
static const std::string UPDATE; // Name for special update queue and exchange.
UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
- broker::Broker& donor, const ClusterMap& map, const std::vector<boost::intrusive_ptr<Connection> >& ,
- const boost::function<void()>& done,
- const boost::function<void(const std::exception&)>& fail);
+ broker::Broker& donor, const ClusterMap& map, uint64_t sequence,
+ const std::vector<boost::intrusive_ptr<Connection> >& ,
+ const boost::function<void()>& done,
+ const boost::function<void(const std::exception&)>& fail);
~UpdateClient();
void update();
@@ -89,6 +90,7 @@ class UpdateClient : public sys::Runnable {
Url updateeUrl;
broker::Broker& updaterBroker;
ClusterMap map;
+ uint64_t frameId;
std::vector<boost::intrusive_ptr<Connection> > connections;
client::Connection connection, shadowConnection;
client::AsyncSession session, shadowSession;