summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-04-11 14:29:04 +0000
committerAlan Conway <aconway@apache.org>2009-04-11 14:29:04 +0000
commit769416f61343b6458529f023164b6ebb837eec3c (patch)
treed38809248e0d8814734fa89bd097774fa60cc5dd /cpp/src/qpid/cluster
parent99d89b32f80599872df73a8f1999acd57aa37748 (diff)
downloadqpid-python-769416f61343b6458529f023164b6ebb837eec3c.tar.gz
Fix issues when cluster is run with persistence enabled.
- Handle partial failures (e.g. due to disk error): failing brokers shut down, others continue. - Enable persistence in cluster tests. - Correct message status in DeliveryRecord updates. - Remove qpid.update queue when update complete - avoid it becoming persistent git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@764204 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp58
-rw-r--r--cpp/src/qpid/cluster/Cluster.h17
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.cpp43
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.h20
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp51
-rw-r--r--cpp/src/qpid/cluster/Connection.h16
-rw-r--r--cpp/src/qpid/cluster/ErrorCheck.cpp120
-rw-r--r--cpp/src/qpid/cluster/ErrorCheck.h80
-rw-r--r--cpp/src/qpid/cluster/EventFrame.h1
-rw-r--r--cpp/src/qpid/cluster/LockedConnectionMap.h2
-rw-r--r--cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h3
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp9
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.h4
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp7
14 files changed, 357 insertions, 74 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 38a41c36e8..ca325dde36 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -36,6 +36,7 @@
#include "qpid/framing/ClusterConfigChangeBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
+#include "qpid/framing/ClusterErrorCheckBody.h"
#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterUpdateOfferBody.h"
@@ -63,6 +64,7 @@ using namespace qpid::framing;
using namespace qpid::sys;
using namespace std;
using namespace qpid::cluster;
+using namespace qpid::framing::cluster;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
@@ -77,9 +79,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
void ready(const std::string& url) { cluster.ready(member, url, l); }
- void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); }
+ void configChange(const std::string& current) { cluster.configChange(member, current, l); }
void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
+ void errorCheck(uint8_t type, uint64_t seq) { cluster.errorCheck(member, type, seq, l); }
void shutdown() { cluster.shutdown(member, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -112,7 +115,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
discarding(true),
state(INIT),
lastSize(0),
- lastBroker(false)
+ lastBroker(false),
+ error(*this)
{
mAgent = ManagementAgent::Singleton::getInstance();
if (mAgent != 0){
@@ -195,14 +199,19 @@ void Cluster::leave() {
leave(l);
}
+#define LEAVE_TRY(STMT) try { STMT; } \
+ catch (const std::exception& e) { \
+ QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
+ } do {} while(0)
+
void Cluster::leave(Lock&) {
if (state != LEFT) {
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
- try { broker.shutdown(); }
- catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
- }
+ // Finalize connections now now to avoid problems later in destructor.
+ LEAVE_TRY(localConnections.clear());
+ LEAVE_TRY(connections.clear());
+ LEAVE_TRY(broker.shutdown());
}
}
@@ -254,10 +263,22 @@ void Cluster::deliveredEvent(const Event& e) {
QPID_LOG(trace, *this << " DROP: " << e);
}
+void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
+ Mutex::ScopedLock l(lock);
+ error.error(connection, type, map.getFrameSeq(), map.getMembers());
+}
+
// Handler for deliverFrameQueue.
// This thread executes the main logic.
void Cluster::deliveredFrame(const EventFrame& e) {
Mutex::ScopedLock l(lock);
+ // Process each frame through the error checker.
+ error.delivered(e);
+ while (error.canProcess()) // There is a frame ready to process.
+ processFrame(error.getNext(), l);
+}
+
+void Cluster::processFrame(const EventFrame& e, Lock& l) {
if (e.isCluster()) {
QPID_LOG(trace, *this << " DLVR: " << e);
ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
@@ -265,7 +286,8 @@ void Cluster::deliveredFrame(const EventFrame& e) {
throw Exception(QPID_MSG("Invalid cluster control"));
}
else if (state >= CATCHUP) {
- QPID_LOG(trace, *this << " DLVR: " << e);
+ map.incrementFrameSeq();
+ QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e);
ConnectionPtr connection = getConnection(e.connectionId, l);
if (connection)
connection->deliveredFrame(e);
@@ -357,8 +379,8 @@ void Cluster::setReady(Lock&) {
broker.getQueueEvents().enable();
}
-void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) {
- bool memberChange = map.configChange(addresses);
+void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) {
+ bool memberChange = map.configChange(current);
if (state == LEFT) return;
if (!map.isAlive(self)) { // Final config change.
@@ -600,8 +622,13 @@ void Cluster::memberUpdate(Lock& l) {
}
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
- static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" };
- return o << cluster.self << "(" << STATE[cluster.state] << ")";
+ static const char* STATE[] = {
+ "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT"
+ };
+ assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
+ o << cluster.self << "(" << STATE[cluster.state];
+ if (cluster.error.isUnresolved()) o << "/error";
+ return o << ")";
}
MemberId Cluster::getId() const {
@@ -635,4 +662,13 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
expiryPolicy->deliverExpire(id);
}
+void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) {
+ // If we receive an errorCheck here, it's because we have processed past the point
+ // of the error so respond with ERROR_TYPE_NONE
+ assert(map.getFrameSeq() >= frameSeq);
+ if (type != framing::cluster::ERROR_TYPE_NONE) // Don't respond if its already NONE.
+ mcast.mcastControl(
+ ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index b716e2d781..8a94fc79dd 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -23,6 +23,7 @@
#include "ClusterSettings.h"
#include "Cpg.h"
#include "Decoder.h"
+#include "ErrorCheck.h"
#include "Event.h"
#include "EventFrame.h"
#include "ExpiryPolicy.h"
@@ -105,6 +106,10 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void deliverFrame(const EventFrame&);
+ // Called in deliverFrame thread to indicate an error from the broker.
+ void flagError(Connection&, ErrorCheck::ErrorType);
+ void connectionError();
+
// Called only during update by Connection::shadowReady
Decoder& getDecoder() { return decoder; }
@@ -132,13 +137,15 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// == Called in deliverFrameQueue thread
void deliveredFrame(const EventFrame&);
+ void processFrame(const EventFrame&, Lock&);
// Cluster controls implement XML methods from cluster.xml.
void updateRequest(const MemberId&, const std::string&, Lock&);
void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
void ready(const MemberId&, const std::string&, Lock&);
- void configChange(const MemberId&, const std::string& addresses, Lock& l);
+ void configChange(const MemberId&, const std::string& current, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
+ void errorCheck(const MemberId&, uint8_t, uint64_t, Lock&);
void shutdown(const MemberId&, Lock&);
// Helper functions
@@ -216,11 +223,13 @@ class Cluster : private Cpg::Handler, public management::Manageable {
Decoder decoder;
bool discarding;
+
// Remaining members are protected by lock.
- // FIXME aconway 2009-03-06: Most of these members are also only used in
+
+ // TODO aconway 2009-03-06: Most of these members are also only used in
// deliverFrameQueue thread or during stall. Review and separate members
// that require a lock, drop lock when not needed.
- //
+
mutable sys::Monitor lock;
@@ -243,7 +252,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
bool lastBroker;
sys::Thread updateThread;
boost::optional<ClusterMap> updatedMap;
-
+ ErrorCheck error;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp
index 9e7232180d..0395ff6382 100644
--- a/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -33,6 +33,13 @@ using namespace framing;
namespace cluster {
+ClusterMap::Set ClusterMap::decode(const std::string& s) {
+ Set set;
+ for (std::string::const_iterator i = s.begin(); i < s.end(); i += 8)
+ set.insert(MemberId(std::string(i, i+8)));
+ return set;
+}
+
namespace {
void addFieldTableValue(FieldTable::ValueMap::value_type vt, ClusterMap::Map& map, ClusterMap::Set& set) {
@@ -54,9 +61,9 @@ void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) {
}
-ClusterMap::ClusterMap() {}
+ClusterMap::ClusterMap() : frameSeq(0) {}
-ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) {
+ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) : frameSeq(0) {
alive.insert(id);
if (isMember)
members[id] = url;
@@ -64,7 +71,9 @@ ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) {
joiners[id] = url;
}
-ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt) {
+ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, uint64_t frameSeq_)
+ : frameSeq(frameSeq_)
+{
std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive)));
std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive)));
}
@@ -78,22 +87,7 @@ void ClusterMap::toMethodBody(framing::ClusterConnectionMembershipBody& b) const
}
b.getMembers().clear();
std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1));
-}
-
-bool ClusterMap::configChange(
- cpg_address *current, int nCurrent,
- cpg_address *left, int nLeft,
- cpg_address */*joined*/, int /*nJoined*/)
-{
- cpg_address* a;
- bool memberChange=false;
- for (a = left; a != left+nLeft; ++a) {
- memberChange = memberChange || members.erase(*a);
- joiners.erase(*a);
- }
- alive.clear();
- std::copy(current, current+nCurrent, std::inserter(alive, alive.end()));
- return memberChange;
+ b.setFrameSeq(frameSeq);
}
Url ClusterMap::getUrl(const Map& map, const MemberId& id) {
@@ -123,8 +117,13 @@ std::vector<Url> ClusterMap::memberUrls() const {
return urls;
}
-ClusterMap::Set ClusterMap::getAlive() const {
- return alive;
+ClusterMap::Set ClusterMap::getAlive() const { return alive; }
+
+ClusterMap::Set ClusterMap::getMembers() const {
+ Set s;
+ std::transform(members.begin(), members.end(), std::inserter(s, s.begin()),
+ boost::bind(&Map::value_type::first, _1));
+ return s;
}
std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) {
@@ -158,7 +157,7 @@ bool ClusterMap::ready(const MemberId& id, const Url& url) {
bool ClusterMap::configChange(const std::string& addresses) {
bool memberChange = false;
- Set update;
+ Set update = decode(addresses);
for (std::string::const_iterator i = addresses.begin(); i < addresses.end(); i += 8)
update.insert(MemberId(std::string(i, i+8)));
Set removed;
diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h
index 4548441442..3359c7c1f3 100644
--- a/cpp/src/qpid/cluster/ClusterMap.h
+++ b/cpp/src/qpid/cluster/ClusterMap.h
@@ -38,26 +38,26 @@
namespace qpid {
namespace cluster {
+typedef std::set<MemberId> MemberSet;
+
/**
- * Map of established cluster members and joiners waiting for an update.
+ * Map of established cluster members and joiners waiting for an update,
+ * along with other cluster state that must be updated.
*/
class ClusterMap {
public:
typedef std::map<MemberId, Url> Map;
typedef std::set<MemberId> Set;
+ static Set decode(const std::string&);
+
ClusterMap();
ClusterMap(const MemberId& id, const Url& url, bool isReady);
- ClusterMap(const framing::FieldTable& urls, const framing::FieldTable& states);
+ ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, uint64_t frameSeq);
/** Update from config change.
*@return true if member set changed.
*/
- bool configChange(
- cpg_address *current, int nCurrent,
- cpg_address *left, int nLeft,
- cpg_address *joined, int nJoined);
-
bool configChange(const std::string& addresses);
bool isJoiner(const MemberId& id) const { return joiners.find(id) != joiners.end(); }
@@ -78,6 +78,7 @@ class ClusterMap {
std::vector<std::string> memberIds() const;
std::vector<Url> memberUrls() const;
Set getAlive() const;
+ Set getMembers() const;
bool updateRequest(const MemberId& id, const std::string& url);
/** Return non-empty Url if accepted */
@@ -90,11 +91,16 @@ class ClusterMap {
* Utility method to return intersection of two member sets
*/
static Set intersection(const Set& a, const Set& b);
+
+ uint64_t getFrameSeq() { return frameSeq; }
+ uint64_t incrementFrameSeq() { return ++frameSeq; }
+
private:
Url getUrl(const Map& map, const MemberId& id);
Map joiners, members;
Set alive;
+ uint64_t frameSeq;
friend std::ostream& operator<<(std::ostream&, const Map&);
friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index aa7d082720..4cb3dec970 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -56,8 +56,16 @@ namespace qpid {
namespace cluster {
using namespace framing;
+using namespace framing::cluster;
+
+qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
+
+Connection::NullFrameHandler Connection::nullFrameHandler;
+
+struct NullFrameHandler : public framing::FrameHandler {
+ void handle(framing::AMQFrame&) {}
+};
-NoOpConnectionOutputHandler Connection::discardHandler;
namespace {
sys::AtomicValue<uint64_t> idCounter;
@@ -89,6 +97,8 @@ void Connection::init() {
connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames
connection.setClientThrottling(false); // Disable client throttling, done by active node.
}
+ if (!isCatchUp())
+ connection.setErrorListener(this);
}
void Connection::giveReadCredit(int credit) {
@@ -97,6 +107,7 @@ void Connection::giveReadCredit(int credit) {
}
Connection::~Connection() {
+ connection.setErrorListener(0);
QPID_LOG(debug, cluster << " deleted connection: " << *this);
}
@@ -126,7 +137,7 @@ void Connection::received(framing::AMQFrame& f) {
cluster.addShadowConnection(this);
AMQFrame ok((ConnectionCloseOkBody()));
connection.getOutput().send(ok);
- output.closeOutput(discardHandler);
+ output.closeOutput();
catchUp = false;
}
else
@@ -156,8 +167,8 @@ void Connection::deliveredFrame(const EventFrame& f) {
{
if (f.type == DATA) // incoming data frames to broker::Connection
connection.received(const_cast<AMQFrame&>(f.frame));
- else { // frame control, send frame via SessionState
- broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession();
+ else { // frame control, send frame via SessionState
+ broker::SessionState* ss = connection.getChannel(currentChannel).getSession();
if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
}
}
@@ -180,7 +191,7 @@ void Connection::closed() {
// This was a local replicated connection. Multicast a deliver
// closed and process any outstanding frames from the cluster
// until self-delivery of deliver-close.
- output.closeOutput(discardHandler);
+ output.closeOutput();
cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
}
}
@@ -275,13 +286,14 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str
QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
self = shadowId;
connection.setUserId(username);
- // OK to use decoder here because we are stalled for update.
+ // OK to use decoder here because cluster is stalled for update.
cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
+ connection.setErrorListener(this);
}
-void Connection::membership(const FieldTable& joiners, const FieldTable& members) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) {
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
- cluster.updateInDone(ClusterMap(joiners, members));
+ cluster.updateInDone(ClusterMap(joiners, members, frameSeq));
self.second = 0; // Mark this as completed update connection.
}
@@ -305,7 +317,9 @@ shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) {
}
broker::QueuedMessage Connection::getUpdateMessage() {
- broker::QueuedMessage m = findQueue(UpdateClient::UPDATE)->get();
+ shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE);
+ assert(!updateq->isDurable());
+ broker::QueuedMessage m = updateq->get();
if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue"));
return m;
}
@@ -342,15 +356,15 @@ void Connection::deliveryRecord(const string& qname,
// If the message was unacked, the newbie broker must place
// it in its messageStore.
- if ( m.payload && m.payload->isPersistent() && !completed && !ended && !accepted && !cancelled )
+ if ( m.payload && m.payload->isPersistent() && acquired && !ended)
queue->enqueue ( 0, m.payload );
}
void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
- shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname);
- if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname));
- q->setPosition(position);
-}
+ shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname);
+ if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname));
+ q->setPosition(position);
+ }
void Connection::expiryId(uint64_t id) {
cluster.getExpiryPolicy().setId(id);
@@ -407,7 +421,14 @@ void Connection::queue(const std::string& encoded) {
QPID_LOG(debug, cluster << " decoded queue " << q->getName());
}
-qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
+void Connection::sessionError(uint16_t , const std::string& ) {
+ cluster.flagError(*this, ERROR_TYPE_SESSION);
+
+}
+
+void Connection::connectionError(const std::string& ) {
+ cluster.flagError(*this, ERROR_TYPE_CONNECTION);
+}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 6434f763a8..49839a456b 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -25,7 +25,6 @@
#include "types.h"
#include "WriteEstimate.h"
#include "OutputInterceptor.h"
-#include "NoOpConnectionOutputHandler.h"
#include "EventFrame.h"
#include "McastFrameHandler.h"
@@ -58,7 +57,8 @@ class Event;
class Connection :
public RefCounted,
public sys::ConnectionInputHandler,
- public framing::AMQP_AllOperations::ClusterConnectionHandler
+ public framing::AMQP_AllOperations::ClusterConnectionHandler,
+ private broker::Connection::ErrorListener
{
public:
@@ -120,7 +120,7 @@ class Connection :
void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment);
- void membership(const framing::FieldTable&, const framing::FieldTable&);
+ void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq);
void deliveryRecord(const std::string& queue,
const framing::SequenceNumber& position,
@@ -156,6 +156,13 @@ class Connection :
void handle(framing::AMQFrame&) {}
};
+
+ static NullFrameHandler nullFrameHandler;
+
+ // Error listener functions
+ void connectionError(const std::string&);
+ void sessionError(uint16_t channel, const std::string&);
+
void init();
bool checkUnsupported(const framing::AMQBody& body);
void deliverClose();
@@ -167,8 +174,6 @@ class Connection :
broker::SemanticState& semanticState();
broker::QueuedMessage getUpdateMessage();
- static NoOpConnectionOutputHandler discardHandler;
-
Cluster& cluster;
ConnectionId self;
bool catchUp;
@@ -181,7 +186,6 @@ class Connection :
boost::shared_ptr<broker::TxBuffer> txBuffer;
bool expectProtocolHeader;
McastFrameHandler mcastFrameHandler;
- NullFrameHandler nullFrameHandler;
static qpid::sys::AtomicValue<uint64_t> catchUpId;
diff --git a/cpp/src/qpid/cluster/ErrorCheck.cpp b/cpp/src/qpid/cluster/ErrorCheck.cpp
new file mode 100644
index 0000000000..cbe3e3daa4
--- /dev/null
+++ b/cpp/src/qpid/cluster/ErrorCheck.cpp
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ErrorCheck.h"
+#include "EventFrame.h"
+#include "ClusterMap.h"
+#include "Cluster.h"
+#include "qpid/framing/ClusterErrorCheckBody.h"
+#include "qpid/framing/ClusterConfigChangeBody.h"
+#include "qpid/log/Statement.h"
+
+#include <algorithm>
+
+namespace qpid {
+namespace cluster {
+
+using namespace std;
+using namespace framing;
+using namespace framing::cluster;
+
+ErrorCheck::ErrorCheck(Cluster& c)
+ : cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0)
+{}
+
+ostream& operator<<(ostream& o, ErrorCheck::MemberSet ms) {
+ copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " "));
+ return o;
+}
+
+void ErrorCheck::error(Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms)
+{
+ // Detected a local error, inform cluster and set error state.
+ assert(t != ERROR_TYPE_NONE); // Must be an error.
+ assert(type == ERROR_TYPE_NONE); // Can only be called while processing
+ type = t;
+ unresolved = ms;
+ frameSeq = seq;
+ connection = &c;
+ QPID_LOG(debug, cluster << (type == ERROR_TYPE_SESSION ? " Session" : " Connection")
+ << " error " << frameSeq << " unresolved: " << unresolved);
+ mcast.mcastControl(ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId());
+}
+
+void ErrorCheck::delivered(const EventFrame& e) {
+ if (isUnresolved()) {
+ const ClusterErrorCheckBody* errorCheck =
+ dynamic_cast<const ClusterErrorCheckBody*>(e.frame.getMethod());
+ const ClusterConfigChangeBody* configChange =
+ dynamic_cast<const ClusterConfigChangeBody*>(e.frame.getMethod());
+
+ if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error
+ if (errorCheck->getType() < type) { // my error is worse than his
+ QPID_LOG(critical, cluster << " Error " << frameSeq << " did not occur on " << e.getMemberId());
+ throw Exception("Aborted by local failure that did not occur on all replicas");
+ }
+ else { // his error is worse/same as mine.
+ QPID_LOG(critical, cluster << " Error " << frameSeq << " outcome agrees with " << e.getMemberId());
+ unresolved.erase(e.getMemberId());
+ checkResolved();
+ }
+ }
+ else {
+ frames.push_back(e); // Only drop matching errorCheck controls.
+ if (configChange) {
+ MemberSet members(ClusterMap::decode(configChange->getCurrent()));
+ MemberSet result;
+ set_intersection(members.begin(), members.end(),
+ unresolved.begin(), unresolved.end(),
+ inserter(result, result.begin()));
+ unresolved.swap(result);
+ checkResolved();
+ }
+ }
+ }
+ else
+ frames.push_back(e);
+}
+
+void ErrorCheck::checkResolved() {
+ if (unresolved.empty()) { // No more potentially conflicted members, we're clear.
+ type = ERROR_TYPE_NONE;
+ QPID_LOG(debug, cluster << " Error " << frameSeq << " resolved.");
+ }
+ else
+ QPID_LOG(debug, cluster << " Error " << frameSeq << " still unresolved: " << unresolved);
+}
+
+EventFrame ErrorCheck::getNext() {
+ assert(canProcess());
+ EventFrame e(frames.front());
+ frames.pop_front();
+ return e;
+}
+
+bool ErrorCheck::canProcess() const {
+ return type == ERROR_TYPE_NONE && !frames.empty();
+}
+
+bool ErrorCheck::isUnresolved() const {
+ return type != ERROR_TYPE_NONE;
+}
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ErrorCheck.h b/cpp/src/qpid/cluster/ErrorCheck.h
new file mode 100644
index 0000000000..97b5f2bffd
--- /dev/null
+++ b/cpp/src/qpid/cluster/ErrorCheck.h
@@ -0,0 +1,80 @@
+#ifndef QPID_CLUSTER_ERRORCHECK_H
+#define QPID_CLUSTER_ERRORCHECK_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "Multicaster.h"
+#include "qpid/framing/enum.h"
+#include <boost/function.hpp>
+#include <deque>
+#include <set>
+
+namespace qpid {
+namespace cluster {
+
+class EventFrame;
+class ClusterMap;
+class Cluster;
+class Multicaster;
+class Connection;
+
+/**
+ * Error checking logic.
+ *
+ * When an error occurs stop processing frames and queue them until we
+ * can determine if all nodes experienced the error. If not, we shut down.
+ */
+class ErrorCheck
+{
+ public:
+ typedef std::set<MemberId> MemberSet;
+ typedef framing::cluster::ErrorType ErrorType;
+
+ ErrorCheck(Cluster&);
+
+ /** A local error has occured */
+ void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&);
+
+ /** Called when a frame is delivered */
+ void delivered(const EventFrame&);
+
+ EventFrame getNext();
+
+ bool canProcess() const;
+ bool isUnresolved() const;
+
+ private:
+ void checkResolved();
+
+ Cluster& cluster;
+ Multicaster& mcast;
+ std::deque<EventFrame> frames;
+ std::set<MemberId> unresolved;
+ uint64_t frameSeq;
+ ErrorType type;
+ Connection* connection;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_ERRORCHECK_H*/
diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h
index d6ff58dd38..aada4c2628 100644
--- a/cpp/src/qpid/cluster/EventFrame.h
+++ b/cpp/src/qpid/cluster/EventFrame.h
@@ -45,6 +45,7 @@ struct EventFrame
bool isCluster() const { return connectionId.getNumber() == 0; }
bool isConnection() const { return connectionId.getNumber() != 0; }
bool isLastInEvent() const { return readCredit; }
+ MemberId getMemberId() const { return connectionId.getMember(); }
ConnectionId connectionId;
diff --git a/cpp/src/qpid/cluster/LockedConnectionMap.h b/cpp/src/qpid/cluster/LockedConnectionMap.h
index 8b2f6dae8e..4df742d6c2 100644
--- a/cpp/src/qpid/cluster/LockedConnectionMap.h
+++ b/cpp/src/qpid/cluster/LockedConnectionMap.h
@@ -52,6 +52,8 @@ class LockedConnectionMap
return 0;
}
+ void clear() { sys::Mutex::ScopedLock l(lock); map.clear(); }
+
private:
typedef std::map<ConnectionId, ConnectionPtr> Map;
mutable sys::Mutex lock;
diff --git a/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h b/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
index 74a376a657..6a30bddf06 100644
--- a/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
+++ b/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h
@@ -30,8 +30,7 @@ namespace framing { class AMQFrame; }
namespace cluster {
/**
- * Output handler for frames sent to noop connections.
- * Simply discards frames.
+ * Output handler shadow connections, simply discards frames.
*/
class NoOpConnectionOutputHandler : public sys::ConnectionOutputHandler
{
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
index cd42446016..da674fa6fd 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -32,8 +32,9 @@ namespace cluster {
using namespace framing;
-OutputInterceptor::OutputInterceptor(
- cluster::Connection& p, sys::ConnectionOutputHandler& h)
+NoOpConnectionOutputHandler OutputInterceptor::discardHandler;
+
+OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h)
: parent(p), closing(false), next(&h), sent(),
writeEstimate(p.getCluster().getWriteEstimate()),
moreOutput(), doingOutput()
@@ -111,10 +112,10 @@ void OutputInterceptor::sendDoOutput() {
QPID_LOG(trace, parent << "Send doOutput request for " << request);
}
-void OutputInterceptor::closeOutput(sys::ConnectionOutputHandler& h) {
+void OutputInterceptor::closeOutput() {
sys::Mutex::ScopedLock l(lock);
closing = true;
- next = &h;
+ next = &discardHandler;
}
void OutputInterceptor::close() {
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.h b/cpp/src/qpid/cluster/OutputInterceptor.h
index c080a419e1..5000893727 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.h
+++ b/cpp/src/qpid/cluster/OutputInterceptor.h
@@ -23,6 +23,7 @@
*/
#include "WriteEstimate.h"
+#include "NoOpConnectionOutputHandler.h"
#include "qpid/sys/ConnectionOutputHandler.h"
#include "qpid/broker/ConnectionFactory.h"
#include "qpid/sys/LatencyMetric.h"
@@ -53,7 +54,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetri
// Intercept doOutput requests on Connection.
bool doOutput();
- void closeOutput(sys::ConnectionOutputHandler& h);
+ void closeOutput();
cluster::Connection& parent;
@@ -70,6 +71,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetri
WriteEstimate writeEstimate;
bool moreOutput;
bool doingOutput;
+ static NoOpConnectionOutputHandler discardHandler;
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index c00b811a20..2696495cb7 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -125,15 +125,19 @@ void UpdateClient::update() {
// Update queue is used to transfer acquired messages that are no longer on their original queue.
session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
session.sync();
- session.close();
std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
+ session.queueDelete(arg::queue=UPDATE);
+ session.close();
+
+
ClusterConnectionProxy(session).expiryId(expiry.getId());
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->handle(frame);
+
connection.close();
QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl);
}
@@ -202,7 +206,6 @@ class MessageUpdater {
sb.get()->send(transfer, message.payload->getFrames());
if (message.payload->isContentReleased()){
uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize;
-
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
bool morecontent = true;
for (uint64_t offset = 0; morecontent; offset += maxContentSize)