summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2011-10-21 14:42:12 +0000
committerStephen D. Huston <shuston@apache.org>2011-10-21 14:42:12 +0000
commitf83677056891e436bf5ba99e79240df2a44528cd (patch)
tree625bfd644b948e89105630759cf6decb0435354d /cpp/src/qpid/cluster/Cluster.cpp
parentebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff)
downloadqpid-python-QPID-2519.tar.gz
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp169
1 files changed, 117 insertions, 52 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index dd4882774b..e6e3de64f2 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -36,45 +36,45 @@
*
* IMPORTANT NOTE: any time code is added to the broker that uses timers,
* the cluster may need to be updated to take account of this.
- *
+ *
*
* USE OF TIMESTAMPS IN THE BROKER
- *
+ *
* The following are the current areas where broker uses timers or timestamps:
- *
+ *
* - Producer flow control: broker::SemanticState uses
* connection::getClusterOrderOutput. a FrameHandler that sends
* frames to the client via the cluster. Used by broker::SessionState
- *
+ *
* - QueueCleaner, Message TTL: uses ExpiryPolicy, which is
* implemented by cluster::ExpiryPolicy.
- *
+ *
* - Connection heartbeat: sends connection controls, not part of
* session command counting so OK to ignore.
- *
+ *
* - LinkRegistry: only cluster elder is ever active for links.
- *
+ *
* - management::ManagementBroker: uses MessageHandler supplied by cluster
* to send messages to the broker via the cluster.
- *
- * - Dtx: not yet supported with cluster.
*
- * cluster::ExpiryPolicy implements the strategy for message expiry.
+ * cluster::ExpiryPolicy uses cluster time.
*
* ClusterTimer implements periodic timed events in the cluster context.
- * Used for periodic management events.
+ * Used for:
+ * - periodic management events.
+ * - DTX transaction timeouts.
*
* <h1>CLUSTER PROTOCOL OVERVIEW</h1>
- *
+ *
* Messages sent to/from CPG are called Events.
*
* An Event carries a ConnectionId, which includes a MemberId and a
* connection number.
- *
+ *
* Events are either
* - Connection events: non-0 connection number and are associated with a connection.
* - Cluster Events: 0 connection number, are not associated with a connection.
- *
+ *
* Events are further categorized as:
* - Control: carries method frame(s) that affect cluster behavior.
* - Data: carries raw data received from a client connection.
@@ -146,6 +146,7 @@
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterConfigChangeBody.h"
+#include "qpid/framing/ClusterClockBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionAbortBody.h"
#include "qpid/framing/ClusterRetractOfferBody.h"
@@ -198,7 +199,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster;
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 1058747;
+const uint32_t Cluster::CLUSTER_VERSION = 1159329;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -214,7 +215,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
{
cluster.initialStatus(
member, version, active, clusterId,
- framing::cluster::StoreState(storeState), shutdownId,
+ framing::cluster::StoreState(storeState), shutdownId,
firstConfig, l);
}
void ready(const std::string& url) {
@@ -230,21 +231,21 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
cluster.updateOffer(member, updatee, l);
}
void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
- void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) {
cluster.errorCheck(member, type, frameSeq, l);
}
void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); }
- void timerDrop(const std::string& name) { cluster.timerWakeup(member, name, l); }
+ void timerDrop(const std::string& name) { cluster.timerDrop(member, name, l); }
void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); }
void deliverToQueue(const std::string& queue, const std::string& message) {
cluster.deliverToQueue(queue, message, l);
}
+ void clock(uint64_t time) { cluster.clock(time, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
- settings(set),
+ settings(set),
broker(b),
mgmtObject(0),
poller(b.getPoller()),
@@ -253,7 +254,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
self(cpg.self()),
clusterId(true),
mAgent(0),
- expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
+ expiryPolicy(new ExpiryPolicy(*this)),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1),
@@ -277,8 +278,11 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
lastBroker(false),
updateRetracted(false),
updateClosed(false),
- error(*this)
+ error(*this),
+ acl(0)
{
+ broker.setInCluster(true);
+
// We give ownership of the timer to the broker and keep a plain pointer.
// This is OK as it means the timer has the same lifetime as the broker.
timer = new ClusterTimer(*this);
@@ -299,7 +303,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
// Load my store status before we go into initialization
if (! broker::NullMessageStore::isNullStore(&broker.getStore())) {
store.load();
- clusterId = store.getClusterId();
+ clusterId = store.getClusterId();
QPID_LOG(notice, "Cluster store state: " << store)
}
cpg.join(name);
@@ -360,14 +364,15 @@ void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
// Safe to use connections here because we're pre-catchup, stalled
// and discarding, so deliveredFrame is not processing any
// connection events.
- assert(discarding);
+ assert(discarding);
pair<ConnectionMap::iterator, bool> ib
= connections.insert(ConnectionMap::value_type(c->getId(), c));
- assert(ib.second);
+ // Like this to avoid tripping up unused variable warning when NDEBUG set
+ if (!ib.second) assert(ib.second);
}
void Cluster::erase(const ConnectionId& id) {
- Lock l(lock);
+ Lock l(lock);
erase(id,l);
}
@@ -393,9 +398,9 @@ std::vector<Url> Cluster::getUrls() const {
std::vector<Url> Cluster::getUrls(Lock&) const {
return map.memberUrls();
-}
+}
-void Cluster::leave() {
+void Cluster::leave() {
Lock l(lock);
leave(l);
}
@@ -405,7 +410,7 @@ void Cluster::leave() {
QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
} do {} while(0)
-void Cluster::leave(Lock&) {
+void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
@@ -424,7 +429,7 @@ void Cluster::deliver(
uint32_t nodeid,
uint32_t pid,
void* msg,
- int msg_len)
+ int msg_len)
{
MemberId from(nodeid, pid);
framing::Buffer buf(static_cast<char*>(msg), msg_len);
@@ -455,7 +460,7 @@ void Cluster::deliveredEvent(const Event& e) {
EventFrame ef(e, e.getFrame());
// Stop the deliverEventQueue on update offers.
// This preserves the connection decoder fragments for an update.
- // Only do this for the two brokers that are directly involved in this
+ // Only do this for the two brokers that are directly involved in this
// offer: the one making the offer, or the one receiving it.
const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody());
if (offer && ( e.getMemberId() == self || MemberId(offer->getUpdatee()) == self) ) {
@@ -465,7 +470,7 @@ void Cluster::deliveredEvent(const Event& e) {
}
deliverFrame(ef);
}
- else if(!discarding) {
+ else if(!discarding) {
if (e.isControl())
deliverFrame(EventFrame(e, e.getFrame()));
else {
@@ -507,7 +512,7 @@ void Cluster::deliveredFrame(const EventFrame& efConst) {
// the event queue.
e.frame = AMQFrame(
ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee()));
- deliverEventQueue.start();
+ deliverEventQueue.start();
}
// Process each frame through the error checker.
if (error.isUnresolved()) {
@@ -515,14 +520,14 @@ void Cluster::deliveredFrame(const EventFrame& efConst) {
while (error.canProcess()) // There is a frame ready to process.
processFrame(error.getNext(), l);
}
- else
+ else
processFrame(e, l);
}
void Cluster::processFrame(const EventFrame& e, Lock& l) {
if (e.isCluster()) {
- QPID_LOG(trace, *this << " DLVR: " << e);
+ QPID_LOG_IF(trace, loggable(e.frame), *this << " DLVR: " << e);
ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
throw Exception(QPID_MSG("Invalid cluster control"));
@@ -531,14 +536,15 @@ void Cluster::processFrame(const EventFrame& e, Lock& l) {
map.incrementFrameSeq();
ConnectionPtr connection = getConnection(e, l);
if (connection) {
- QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e);
+ QPID_LOG_IF(trace, loggable(e.frame),
+ *this << " DLVR " << map.getFrameSeq() << ": " << e);
connection->deliveredFrame(e);
}
else
- QPID_LOG(trace, *this << " DROP (no connection): " << e);
+ throw Exception(QPID_MSG("Unknown connection: " << e));
}
else // Drop connection frames while state < CATCHUP
- QPID_LOG(trace, *this << " DROP (joining): " << e);
+ QPID_LOG_IF(trace, loggable(e.frame), *this << " DROP (joining): " << e);
}
// Called in deliverFrameQueue thread
@@ -577,7 +583,7 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) {
}
// CPG config-change callback.
-void Cluster::configChange (
+void Cluster::configChange (
cpg_handle_t /*handle*/,
const cpg_name */*group*/,
const cpg_address *members, int nMembers,
@@ -607,7 +613,7 @@ void Cluster::setReady(Lock&) {
}
// Set the management status from the Cluster::state.
-//
+//
// NOTE: Management updates are sent based on property changes. In
// order to keep consistency across the cluster, we touch the local
// management status property even if it is locally unchanged for any
@@ -618,7 +624,7 @@ void Cluster::setMgmtStatus(Lock&) {
}
void Cluster::initMapCompleted(Lock& l) {
- // Called on completion of the initial status map.
+ // Called on completion of the initial status map.
QPID_LOG(debug, *this << " initial status map complete. ");
setMgmtStatus(l);
if (state == PRE_INIT) {
@@ -665,6 +671,8 @@ void Cluster::initMapCompleted(Lock& l) {
else { // I can go ready.
discarding = false;
setReady(l);
+ // Must be called *before* memberUpdate so first update will be generated.
+ failoverExchange->setReady();
memberUpdate(l);
updateMgmtMembership(l);
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
@@ -701,8 +709,8 @@ void Cluster::configChange(const MemberId&,
if (initMap.isResendNeeded()) {
mcast.mcastControl(
ClusterInitialStatusBody(
- ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
- store.getState(), store.getShutdownId(),
+ ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId,
+ store.getState(), store.getShutdownId(),
initMap.getFirstConfigStr()
),
self);
@@ -717,6 +725,20 @@ void Cluster::configChange(const MemberId&,
updateMgmtMembership(l); // Update on every config change for consistency
}
+struct ClusterClockTask : public sys::TimerTask {
+ Cluster& cluster;
+ sys::Timer& timer;
+
+ ClusterClockTask(Cluster& cluster, sys::Timer& timer, uint16_t clockInterval)
+ : TimerTask(Duration(clockInterval * TIME_MSEC),"ClusterClock"), cluster(cluster), timer(timer) {}
+
+ void fire() {
+ cluster.sendClockUpdate();
+ setupNextFire();
+ timer.add(this);
+ }
+};
+
void Cluster::becomeElder(Lock&) {
if (elder) return; // We were already the elder.
// We are the oldest, reactive links if necessary
@@ -724,6 +746,8 @@ void Cluster::becomeElder(Lock&) {
elder = true;
broker.getLinks().setPassive(false);
timer->becomeElder();
+
+ clockTimer.add(new ClusterClockTask(*this, clockTimer, settings.clockInterval));
}
void Cluster::makeOffer(const MemberId& id, Lock& ) {
@@ -759,7 +783,7 @@ std::string Cluster::debugSnapshot() {
// point we know the poller has stopped so no poller callbacks will be
// invoked. We must ensure that CPG has also shut down so no CPG
// callbacks will be invoked.
-//
+//
void Cluster::brokerShutdown() {
sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts.
try { cpg.shutdown(); }
@@ -775,7 +799,7 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l)
}
void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active,
- const framing::Uuid& id,
+ const framing::Uuid& id,
framing::cluster::StoreState store,
const framing::Uuid& shutdownId,
const std::string& firstConfig,
@@ -833,6 +857,8 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l)
else if (updatee == self && url) {
assert(state == JOINER);
state = UPDATEE;
+ acl = broker.getAcl();
+ broker.setAcl(0); // Disable ACL during update
QPID_LOG(notice, *this << " receiving update from " << updater);
checkUpdateIn(l);
}
@@ -844,7 +870,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l)
if (updatee != self && url) {
QPID_LOG(debug, debugSnapshot());
if (mAgent) mAgent->clusterUpdate();
- // Updatee will call clusterUpdate when update completes
+ // Updatee will call clusterUpdate() via checkUpdateIn() when update completes
}
}
@@ -925,13 +951,15 @@ void Cluster::checkUpdateIn(Lock& l) {
if (!updateClosed) return; // Wait till update connection closes.
if (updatedMap) { // We're up to date
map = *updatedMap;
- failoverExchange->setUrls(getUrls(l));
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
memberUpdate(l);
+ // Must be called *after* memberUpdate() to avoid sending an extra update.
+ failoverExchange->setReady();
// NB: don't updateMgmtMembership() here as we are not in the deliver
// thread. It will be updated on delivery of the "ready" we just mcast.
broker.setClusterUpdatee(false);
+ broker.setAcl(acl); // Restore ACL
discarding = false; // OK to set, we're stalled for update.
QPID_LOG(notice, *this << " update complete, starting catch-up.");
QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled.
@@ -941,6 +969,10 @@ void Cluster::checkUpdateIn(Lock& l) {
mAgent->suppress(false); // Enable management output.
mAgent->clusterUpdate();
}
+ // Restore alternate exchange settings on exchanges.
+ broker.getExchanges().eachExchange(
+ boost::bind(&broker::Exchange::recoveryComplete, _1,
+ boost::ref(broker.getExchanges())));
enableClusterSafe(); // Enable cluster-safe assertions
deliverEventQueue.start();
}
@@ -969,7 +1001,7 @@ void Cluster::updateOutDone(Lock& l) {
void Cluster::updateOutError(const std::exception& e) {
Monitor::ScopedLock l(lock);
- QPID_LOG(error, *this << " error sending update: " << e.what());
+ QPID_LOG(error, *this << " error sending update: " << e.what());
updateOutDone(l);
}
@@ -1067,7 +1099,7 @@ void Cluster::memberUpdate(Lock& l) {
void Cluster::updateMgmtMembership(Lock& l) {
if (!mgmtObject) return;
std::vector<Url> urls = getUrls(l);
- mgmtObject->set_clusterSize(urls.size());
+ mgmtObject->set_clusterSize(urls.size());
string urlstr;
for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) {
if (i != urls.begin()) urlstr += ";";
@@ -1114,10 +1146,6 @@ void Cluster::setClusterId(const Uuid& uuid, Lock&) {
QPID_LOG(notice, *this << " cluster-uuid = " << clusterId);
}
-void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
- expiryPolicy->deliverExpire(id);
-}
-
void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) {
// If we see an errorCheck here (rather than in the ErrorCheck
// class) then we have processed succesfully past the point of the
@@ -1155,6 +1183,35 @@ void Cluster::deliverToQueue(const std::string& queue, const std::string& messag
q->deliver(msg);
}
+sys::AbsTime Cluster::getClusterTime() {
+ Mutex::ScopedLock l(lock);
+ return clusterTime;
+}
+
+// This method is called during update on the updatee to set the initial cluster time.
+void Cluster::clock(const uint64_t time) {
+ Mutex::ScopedLock l(lock);
+ clock(time, l);
+}
+
+// called when broadcast message received
+void Cluster::clock(const uint64_t time, Lock&) {
+ clusterTime = AbsTime(EPOCH, time);
+ AbsTime now = AbsTime::now();
+
+ if (!elder) {
+ clusterTimeOffset = Duration(now, clusterTime);
+ }
+}
+
+// called by elder timer to send clock broadcast
+void Cluster::sendClockUpdate() {
+ Mutex::ScopedLock l(lock);
+ int64_t nanosecondsSinceEpoch = Duration(EPOCH, now());
+ nanosecondsSinceEpoch += clusterTimeOffset;
+ mcast.mcastControl(ClusterClockBody(ProtocolVersion(), nanosecondsSinceEpoch), self);
+}
+
bool Cluster::deferDeliveryImpl(const std::string& queue,
const boost::intrusive_ptr<broker::Message>& msg)
{
@@ -1167,4 +1224,12 @@ bool Cluster::deferDeliveryImpl(const std::string& queue,
return true;
}
+bool Cluster::loggable(const AMQFrame& f) {
+ const AMQMethodBody* method = (f.getMethod());
+ if (!method) return true; // Not a method
+ bool isClock = method->amqpClassId() == ClusterClockBody::CLASS_ID
+ && method->amqpMethodId() == ClusterClockBody::METHOD_ID;
+ return !isClock;
+}
+
}} // namespace qpid::cluster