diff options
| author | Alan Conway <aconway@apache.org> | 2009-04-11 14:29:04 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2009-04-11 14:29:04 +0000 |
| commit | 769416f61343b6458529f023164b6ebb837eec3c (patch) | |
| tree | d38809248e0d8814734fa89bd097774fa60cc5dd /cpp/src/qpid/cluster | |
| parent | 99d89b32f80599872df73a8f1999acd57aa37748 (diff) | |
| download | qpid-python-769416f61343b6458529f023164b6ebb837eec3c.tar.gz | |
Fix issues when cluster is run with persistence enabled.
- Handle partial failures (e.g. due to disk error): failing brokers shut down, others continue.
- Enable persistence in cluster tests.
- Correct message status in DeliveryRecord updates.
- Remove qpid.update queue when update complete - avoid it becoming persistent
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@764204 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 58 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.cpp | 43 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.h | 20 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 51 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 16 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ErrorCheck.cpp | 120 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ErrorCheck.h | 80 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/EventFrame.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/LockedConnectionMap.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp | 7 |
14 files changed, 357 insertions, 74 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 38a41c36e8..ca325dde36 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -36,6 +36,7 @@ #include "qpid/framing/ClusterConfigChangeBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" +#include "qpid/framing/ClusterErrorCheckBody.h" #include "qpid/framing/ClusterReadyBody.h" #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterUpdateOfferBody.h" @@ -63,6 +64,7 @@ using namespace qpid::framing; using namespace qpid::sys; using namespace std; using namespace qpid::cluster; +using namespace qpid::framing::cluster; using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; @@ -77,9 +79,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } - void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); } + void configChange(const std::string& current) { cluster.configChange(member, current, l); } void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); } void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } + void errorCheck(uint8_t type, uint64_t seq) { cluster.errorCheck(member, type, seq, l); } void shutdown() { cluster.shutdown(member, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } @@ -112,7 +115,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : discarding(true), state(INIT), lastSize(0), - lastBroker(false) + lastBroker(false), + error(*this) { mAgent = ManagementAgent::Singleton::getInstance(); if (mAgent != 0){ @@ -195,14 +199,19 @@ void Cluster::leave() { leave(l); } +#define LEAVE_TRY(STMT) try { STMT; } \ + catch (const std::exception& e) { \ + QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \ + } do {} while(0) + void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); - try { broker.shutdown(); } - catch (const std::exception& e) { - QPID_LOG(critical, *this << " error during broker shutdown: " << e.what()); - } + // Finalize connections now now to avoid problems later in destructor. + LEAVE_TRY(localConnections.clear()); + LEAVE_TRY(connections.clear()); + LEAVE_TRY(broker.shutdown()); } } @@ -254,10 +263,22 @@ void Cluster::deliveredEvent(const Event& e) { QPID_LOG(trace, *this << " DROP: " << e); } +void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) { + Mutex::ScopedLock l(lock); + error.error(connection, type, map.getFrameSeq(), map.getMembers()); +} + // Handler for deliverFrameQueue. // This thread executes the main logic. void Cluster::deliveredFrame(const EventFrame& e) { Mutex::ScopedLock l(lock); + // Process each frame through the error checker. + error.delivered(e); + while (error.canProcess()) // There is a frame ready to process. + processFrame(error.getNext(), l); +} + +void Cluster::processFrame(const EventFrame& e, Lock& l) { if (e.isCluster()) { QPID_LOG(trace, *this << " DLVR: " << e); ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); @@ -265,7 +286,8 @@ void Cluster::deliveredFrame(const EventFrame& e) { throw Exception(QPID_MSG("Invalid cluster control")); } else if (state >= CATCHUP) { - QPID_LOG(trace, *this << " DLVR: " << e); + map.incrementFrameSeq(); + QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e); ConnectionPtr connection = getConnection(e.connectionId, l); if (connection) connection->deliveredFrame(e); @@ -357,8 +379,8 @@ void Cluster::setReady(Lock&) { broker.getQueueEvents().enable(); } -void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) { - bool memberChange = map.configChange(addresses); +void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) { + bool memberChange = map.configChange(current); if (state == LEFT) return; if (!map.isAlive(self)) { // Final config change. @@ -600,8 +622,13 @@ void Cluster::memberUpdate(Lock& l) { } std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { - static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" }; - return o << cluster.self << "(" << STATE[cluster.state] << ")"; + static const char* STATE[] = { + "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" + }; + assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1); + o << cluster.self << "(" << STATE[cluster.state]; + if (cluster.error.isUnresolved()) o << "/error"; + return o << ")"; } MemberId Cluster::getId() const { @@ -635,4 +662,13 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { expiryPolicy->deliverExpire(id); } +void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) { + // If we receive an errorCheck here, it's because we have processed past the point + // of the error so respond with ERROR_TYPE_NONE + assert(map.getFrameSeq() >= frameSeq); + if (type != framing::cluster::ERROR_TYPE_NONE) // Don't respond if its already NONE. + mcast.mcastControl( + ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self); +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index b716e2d781..8a94fc79dd 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -23,6 +23,7 @@ #include "ClusterSettings.h" #include "Cpg.h" #include "Decoder.h" +#include "ErrorCheck.h" #include "Event.h" #include "EventFrame.h" #include "ExpiryPolicy.h" @@ -105,6 +106,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { void deliverFrame(const EventFrame&); + // Called in deliverFrame thread to indicate an error from the broker. + void flagError(Connection&, ErrorCheck::ErrorType); + void connectionError(); + // Called only during update by Connection::shadowReady Decoder& getDecoder() { return decoder; } @@ -132,13 +137,15 @@ class Cluster : private Cpg::Handler, public management::Manageable { // == Called in deliverFrameQueue thread void deliveredFrame(const EventFrame&); + void processFrame(const EventFrame&, Lock&); // Cluster controls implement XML methods from cluster.xml. void updateRequest(const MemberId&, const std::string&, Lock&); void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&); void ready(const MemberId&, const std::string&, Lock&); - void configChange(const MemberId&, const std::string& addresses, Lock& l); + void configChange(const MemberId&, const std::string& current, Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); + void errorCheck(const MemberId&, uint8_t, uint64_t, Lock&); void shutdown(const MemberId&, Lock&); // Helper functions @@ -216,11 +223,13 @@ class Cluster : private Cpg::Handler, public management::Manageable { Decoder decoder; bool discarding; + // Remaining members are protected by lock. - // FIXME aconway 2009-03-06: Most of these members are also only used in + + // TODO aconway 2009-03-06: Most of these members are also only used in // deliverFrameQueue thread or during stall. Review and separate members // that require a lock, drop lock when not needed. - // + mutable sys::Monitor lock; @@ -243,7 +252,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { bool lastBroker; sys::Thread updateThread; boost::optional<ClusterMap> updatedMap; - + ErrorCheck error; friend std::ostream& operator<<(std::ostream&, const Cluster&); friend class ClusterDispatcher; diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index 9e7232180d..0395ff6382 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -33,6 +33,13 @@ using namespace framing; namespace cluster { +ClusterMap::Set ClusterMap::decode(const std::string& s) { + Set set; + for (std::string::const_iterator i = s.begin(); i < s.end(); i += 8) + set.insert(MemberId(std::string(i, i+8))); + return set; +} + namespace { void addFieldTableValue(FieldTable::ValueMap::value_type vt, ClusterMap::Map& map, ClusterMap::Set& set) { @@ -54,9 +61,9 @@ void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) { } -ClusterMap::ClusterMap() {} +ClusterMap::ClusterMap() : frameSeq(0) {} -ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) { +ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) : frameSeq(0) { alive.insert(id); if (isMember) members[id] = url; @@ -64,7 +71,9 @@ ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) { joiners[id] = url; } -ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt) { +ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, uint64_t frameSeq_) + : frameSeq(frameSeq_) +{ std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive))); std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive))); } @@ -78,22 +87,7 @@ void ClusterMap::toMethodBody(framing::ClusterConnectionMembershipBody& b) const } b.getMembers().clear(); std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1)); -} - -bool ClusterMap::configChange( - cpg_address *current, int nCurrent, - cpg_address *left, int nLeft, - cpg_address */*joined*/, int /*nJoined*/) -{ - cpg_address* a; - bool memberChange=false; - for (a = left; a != left+nLeft; ++a) { - memberChange = memberChange || members.erase(*a); - joiners.erase(*a); - } - alive.clear(); - std::copy(current, current+nCurrent, std::inserter(alive, alive.end())); - return memberChange; + b.setFrameSeq(frameSeq); } Url ClusterMap::getUrl(const Map& map, const MemberId& id) { @@ -123,8 +117,13 @@ std::vector<Url> ClusterMap::memberUrls() const { return urls; } -ClusterMap::Set ClusterMap::getAlive() const { - return alive; +ClusterMap::Set ClusterMap::getAlive() const { return alive; } + +ClusterMap::Set ClusterMap::getMembers() const { + Set s; + std::transform(members.begin(), members.end(), std::inserter(s, s.begin()), + boost::bind(&Map::value_type::first, _1)); + return s; } std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) { @@ -158,7 +157,7 @@ bool ClusterMap::ready(const MemberId& id, const Url& url) { bool ClusterMap::configChange(const std::string& addresses) { bool memberChange = false; - Set update; + Set update = decode(addresses); for (std::string::const_iterator i = addresses.begin(); i < addresses.end(); i += 8) update.insert(MemberId(std::string(i, i+8))); Set removed; diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h index 4548441442..3359c7c1f3 100644 --- a/cpp/src/qpid/cluster/ClusterMap.h +++ b/cpp/src/qpid/cluster/ClusterMap.h @@ -38,26 +38,26 @@ namespace qpid { namespace cluster { +typedef std::set<MemberId> MemberSet; + /** - * Map of established cluster members and joiners waiting for an update. + * Map of established cluster members and joiners waiting for an update, + * along with other cluster state that must be updated. */ class ClusterMap { public: typedef std::map<MemberId, Url> Map; typedef std::set<MemberId> Set; + static Set decode(const std::string&); + ClusterMap(); ClusterMap(const MemberId& id, const Url& url, bool isReady); - ClusterMap(const framing::FieldTable& urls, const framing::FieldTable& states); + ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, uint64_t frameSeq); /** Update from config change. *@return true if member set changed. */ - bool configChange( - cpg_address *current, int nCurrent, - cpg_address *left, int nLeft, - cpg_address *joined, int nJoined); - bool configChange(const std::string& addresses); bool isJoiner(const MemberId& id) const { return joiners.find(id) != joiners.end(); } @@ -78,6 +78,7 @@ class ClusterMap { std::vector<std::string> memberIds() const; std::vector<Url> memberUrls() const; Set getAlive() const; + Set getMembers() const; bool updateRequest(const MemberId& id, const std::string& url); /** Return non-empty Url if accepted */ @@ -90,11 +91,16 @@ class ClusterMap { * Utility method to return intersection of two member sets */ static Set intersection(const Set& a, const Set& b); + + uint64_t getFrameSeq() { return frameSeq; } + uint64_t incrementFrameSeq() { return ++frameSeq; } + private: Url getUrl(const Map& map, const MemberId& id); Map joiners, members; Set alive; + uint64_t frameSeq; friend std::ostream& operator<<(std::ostream&, const Map&); friend std::ostream& operator<<(std::ostream&, const ClusterMap&); diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index aa7d082720..4cb3dec970 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -56,8 +56,16 @@ namespace qpid { namespace cluster { using namespace framing; +using namespace framing::cluster; + +qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); + +Connection::NullFrameHandler Connection::nullFrameHandler; + +struct NullFrameHandler : public framing::FrameHandler { + void handle(framing::AMQFrame&) {} +}; -NoOpConnectionOutputHandler Connection::discardHandler; namespace { sys::AtomicValue<uint64_t> idCounter; @@ -89,6 +97,8 @@ void Connection::init() { connection.setClusterOrderOutput(nullFrameHandler); // Passive, discard cluster-order frames connection.setClientThrottling(false); // Disable client throttling, done by active node. } + if (!isCatchUp()) + connection.setErrorListener(this); } void Connection::giveReadCredit(int credit) { @@ -97,6 +107,7 @@ void Connection::giveReadCredit(int credit) { } Connection::~Connection() { + connection.setErrorListener(0); QPID_LOG(debug, cluster << " deleted connection: " << *this); } @@ -126,7 +137,7 @@ void Connection::received(framing::AMQFrame& f) { cluster.addShadowConnection(this); AMQFrame ok((ConnectionCloseOkBody())); connection.getOutput().send(ok); - output.closeOutput(discardHandler); + output.closeOutput(); catchUp = false; } else @@ -156,8 +167,8 @@ void Connection::deliveredFrame(const EventFrame& f) { { if (f.type == DATA) // incoming data frames to broker::Connection connection.received(const_cast<AMQFrame&>(f.frame)); - else { // frame control, send frame via SessionState - broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession(); + else { // frame control, send frame via SessionState + broker::SessionState* ss = connection.getChannel(currentChannel).getSession(); if (ss) ss->out(const_cast<AMQFrame&>(f.frame)); } } @@ -180,7 +191,7 @@ void Connection::closed() { // This was a local replicated connection. Multicast a deliver // closed and process any outstanding frames from the cluster // until self-delivery of deliver-close. - output.closeOutput(discardHandler); + output.closeOutput(); cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self); } } @@ -275,13 +286,14 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId); self = shadowId; connection.setUserId(username); - // OK to use decoder here because we are stalled for update. + // OK to use decoder here because cluster is stalled for update. cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size()); + connection.setErrorListener(this); } -void Connection::membership(const FieldTable& joiners, const FieldTable& members) { +void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); - cluster.updateInDone(ClusterMap(joiners, members)); + cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); self.second = 0; // Mark this as completed update connection. } @@ -305,7 +317,9 @@ shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) { } broker::QueuedMessage Connection::getUpdateMessage() { - broker::QueuedMessage m = findQueue(UpdateClient::UPDATE)->get(); + shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE); + assert(!updateq->isDurable()); + broker::QueuedMessage m = updateq->get(); if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue")); return m; } @@ -342,15 +356,15 @@ void Connection::deliveryRecord(const string& qname, // If the message was unacked, the newbie broker must place // it in its messageStore. - if ( m.payload && m.payload->isPersistent() && !completed && !ended && !accepted && !cancelled ) + if ( m.payload && m.payload->isPersistent() && acquired && !ended) queue->enqueue ( 0, m.payload ); } void Connection::queuePosition(const string& qname, const SequenceNumber& position) { - shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname); - if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname)); - q->setPosition(position); -} + shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname); + if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname)); + q->setPosition(position); + } void Connection::expiryId(uint64_t id) { cluster.getExpiryPolicy().setId(id); @@ -407,7 +421,14 @@ void Connection::queue(const std::string& encoded) { QPID_LOG(debug, cluster << " decoded queue " << q->getName()); } -qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); +void Connection::sessionError(uint16_t , const std::string& ) { + cluster.flagError(*this, ERROR_TYPE_SESSION); + +} + +void Connection::connectionError(const std::string& ) { + cluster.flagError(*this, ERROR_TYPE_CONNECTION); +} }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 6434f763a8..49839a456b 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -25,7 +25,6 @@ #include "types.h" #include "WriteEstimate.h" #include "OutputInterceptor.h" -#include "NoOpConnectionOutputHandler.h" #include "EventFrame.h" #include "McastFrameHandler.h" @@ -58,7 +57,8 @@ class Event; class Connection : public RefCounted, public sys::ConnectionInputHandler, - public framing::AMQP_AllOperations::ClusterConnectionHandler + public framing::AMQP_AllOperations::ClusterConnectionHandler, + private broker::Connection::ErrorListener { public: @@ -120,7 +120,7 @@ class Connection : void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment); - void membership(const framing::FieldTable&, const framing::FieldTable&); + void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq); void deliveryRecord(const std::string& queue, const framing::SequenceNumber& position, @@ -156,6 +156,13 @@ class Connection : void handle(framing::AMQFrame&) {} }; + + static NullFrameHandler nullFrameHandler; + + // Error listener functions + void connectionError(const std::string&); + void sessionError(uint16_t channel, const std::string&); + void init(); bool checkUnsupported(const framing::AMQBody& body); void deliverClose(); @@ -167,8 +174,6 @@ class Connection : broker::SemanticState& semanticState(); broker::QueuedMessage getUpdateMessage(); - static NoOpConnectionOutputHandler discardHandler; - Cluster& cluster; ConnectionId self; bool catchUp; @@ -181,7 +186,6 @@ class Connection : boost::shared_ptr<broker::TxBuffer> txBuffer; bool expectProtocolHeader; McastFrameHandler mcastFrameHandler; - NullFrameHandler nullFrameHandler; static qpid::sys::AtomicValue<uint64_t> catchUpId; diff --git a/cpp/src/qpid/cluster/ErrorCheck.cpp b/cpp/src/qpid/cluster/ErrorCheck.cpp new file mode 100644 index 0000000000..cbe3e3daa4 --- /dev/null +++ b/cpp/src/qpid/cluster/ErrorCheck.cpp @@ -0,0 +1,120 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ErrorCheck.h" +#include "EventFrame.h" +#include "ClusterMap.h" +#include "Cluster.h" +#include "qpid/framing/ClusterErrorCheckBody.h" +#include "qpid/framing/ClusterConfigChangeBody.h" +#include "qpid/log/Statement.h" + +#include <algorithm> + +namespace qpid { +namespace cluster { + +using namespace std; +using namespace framing; +using namespace framing::cluster; + +ErrorCheck::ErrorCheck(Cluster& c) + : cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0) +{} + +ostream& operator<<(ostream& o, ErrorCheck::MemberSet ms) { + copy(ms.begin(), ms.end(), ostream_iterator<MemberId>(o, " ")); + return o; +} + +void ErrorCheck::error(Connection& c, ErrorType t, uint64_t seq, const MemberSet& ms) +{ + // Detected a local error, inform cluster and set error state. + assert(t != ERROR_TYPE_NONE); // Must be an error. + assert(type == ERROR_TYPE_NONE); // Can only be called while processing + type = t; + unresolved = ms; + frameSeq = seq; + connection = &c; + QPID_LOG(debug, cluster << (type == ERROR_TYPE_SESSION ? " Session" : " Connection") + << " error " << frameSeq << " unresolved: " << unresolved); + mcast.mcastControl(ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId()); +} + +void ErrorCheck::delivered(const EventFrame& e) { + if (isUnresolved()) { + const ClusterErrorCheckBody* errorCheck = + dynamic_cast<const ClusterErrorCheckBody*>(e.frame.getMethod()); + const ClusterConfigChangeBody* configChange = + dynamic_cast<const ClusterConfigChangeBody*>(e.frame.getMethod()); + + if (errorCheck && errorCheck->getFrameSeq() == frameSeq) { // Same error + if (errorCheck->getType() < type) { // my error is worse than his + QPID_LOG(critical, cluster << " Error " << frameSeq << " did not occur on " << e.getMemberId()); + throw Exception("Aborted by local failure that did not occur on all replicas"); + } + else { // his error is worse/same as mine. + QPID_LOG(critical, cluster << " Error " << frameSeq << " outcome agrees with " << e.getMemberId()); + unresolved.erase(e.getMemberId()); + checkResolved(); + } + } + else { + frames.push_back(e); // Only drop matching errorCheck controls. + if (configChange) { + MemberSet members(ClusterMap::decode(configChange->getCurrent())); + MemberSet result; + set_intersection(members.begin(), members.end(), + unresolved.begin(), unresolved.end(), + inserter(result, result.begin())); + unresolved.swap(result); + checkResolved(); + } + } + } + else + frames.push_back(e); +} + +void ErrorCheck::checkResolved() { + if (unresolved.empty()) { // No more potentially conflicted members, we're clear. + type = ERROR_TYPE_NONE; + QPID_LOG(debug, cluster << " Error " << frameSeq << " resolved."); + } + else + QPID_LOG(debug, cluster << " Error " << frameSeq << " still unresolved: " << unresolved); +} + +EventFrame ErrorCheck::getNext() { + assert(canProcess()); + EventFrame e(frames.front()); + frames.pop_front(); + return e; +} + +bool ErrorCheck::canProcess() const { + return type == ERROR_TYPE_NONE && !frames.empty(); +} + +bool ErrorCheck::isUnresolved() const { + return type != ERROR_TYPE_NONE; +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ErrorCheck.h b/cpp/src/qpid/cluster/ErrorCheck.h new file mode 100644 index 0000000000..97b5f2bffd --- /dev/null +++ b/cpp/src/qpid/cluster/ErrorCheck.h @@ -0,0 +1,80 @@ +#ifndef QPID_CLUSTER_ERRORCHECK_H +#define QPID_CLUSTER_ERRORCHECK_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "Multicaster.h" +#include "qpid/framing/enum.h" +#include <boost/function.hpp> +#include <deque> +#include <set> + +namespace qpid { +namespace cluster { + +class EventFrame; +class ClusterMap; +class Cluster; +class Multicaster; +class Connection; + +/** + * Error checking logic. + * + * When an error occurs stop processing frames and queue them until we + * can determine if all nodes experienced the error. If not, we shut down. + */ +class ErrorCheck +{ + public: + typedef std::set<MemberId> MemberSet; + typedef framing::cluster::ErrorType ErrorType; + + ErrorCheck(Cluster&); + + /** A local error has occured */ + void error(Connection&, ErrorType, uint64_t frameSeq, const MemberSet&); + + /** Called when a frame is delivered */ + void delivered(const EventFrame&); + + EventFrame getNext(); + + bool canProcess() const; + bool isUnresolved() const; + + private: + void checkResolved(); + + Cluster& cluster; + Multicaster& mcast; + std::deque<EventFrame> frames; + std::set<MemberId> unresolved; + uint64_t frameSeq; + ErrorType type; + Connection* connection; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_ERRORCHECK_H*/ diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h index d6ff58dd38..aada4c2628 100644 --- a/cpp/src/qpid/cluster/EventFrame.h +++ b/cpp/src/qpid/cluster/EventFrame.h @@ -45,6 +45,7 @@ struct EventFrame bool isCluster() const { return connectionId.getNumber() == 0; } bool isConnection() const { return connectionId.getNumber() != 0; } bool isLastInEvent() const { return readCredit; } + MemberId getMemberId() const { return connectionId.getMember(); } ConnectionId connectionId; diff --git a/cpp/src/qpid/cluster/LockedConnectionMap.h b/cpp/src/qpid/cluster/LockedConnectionMap.h index 8b2f6dae8e..4df742d6c2 100644 --- a/cpp/src/qpid/cluster/LockedConnectionMap.h +++ b/cpp/src/qpid/cluster/LockedConnectionMap.h @@ -52,6 +52,8 @@ class LockedConnectionMap return 0; } + void clear() { sys::Mutex::ScopedLock l(lock); map.clear(); } + private: typedef std::map<ConnectionId, ConnectionPtr> Map; mutable sys::Mutex lock; diff --git a/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h b/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h index 74a376a657..6a30bddf06 100644 --- a/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h +++ b/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h @@ -30,8 +30,7 @@ namespace framing { class AMQFrame; } namespace cluster { /** - * Output handler for frames sent to noop connections. - * Simply discards frames. + * Output handler shadow connections, simply discards frames. */ class NoOpConnectionOutputHandler : public sys::ConnectionOutputHandler { diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index cd42446016..da674fa6fd 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -32,8 +32,9 @@ namespace cluster { using namespace framing; -OutputInterceptor::OutputInterceptor( - cluster::Connection& p, sys::ConnectionOutputHandler& h) +NoOpConnectionOutputHandler OutputInterceptor::discardHandler; + +OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h) : parent(p), closing(false), next(&h), sent(), writeEstimate(p.getCluster().getWriteEstimate()), moreOutput(), doingOutput() @@ -111,10 +112,10 @@ void OutputInterceptor::sendDoOutput() { QPID_LOG(trace, parent << "Send doOutput request for " << request); } -void OutputInterceptor::closeOutput(sys::ConnectionOutputHandler& h) { +void OutputInterceptor::closeOutput() { sys::Mutex::ScopedLock l(lock); closing = true; - next = &h; + next = &discardHandler; } void OutputInterceptor::close() { diff --git a/cpp/src/qpid/cluster/OutputInterceptor.h b/cpp/src/qpid/cluster/OutputInterceptor.h index c080a419e1..5000893727 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.h +++ b/cpp/src/qpid/cluster/OutputInterceptor.h @@ -23,6 +23,7 @@ */ #include "WriteEstimate.h" +#include "NoOpConnectionOutputHandler.h" #include "qpid/sys/ConnectionOutputHandler.h" #include "qpid/broker/ConnectionFactory.h" #include "qpid/sys/LatencyMetric.h" @@ -53,7 +54,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetri // Intercept doOutput requests on Connection. bool doOutput(); - void closeOutput(sys::ConnectionOutputHandler& h); + void closeOutput(); cluster::Connection& parent; @@ -70,6 +71,7 @@ class OutputInterceptor : public sys::ConnectionOutputHandler, sys::LatencyMetri WriteEstimate writeEstimate; bool moreOutput; bool doingOutput; + static NoOpConnectionOutputHandler discardHandler; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index c00b811a20..2696495cb7 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -125,15 +125,19 @@ void UpdateClient::update() { // Update queue is used to transfer acquired messages that are no longer on their original queue. session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); - session.close(); std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); + session.queueDelete(arg::queue=UPDATE); + session.close(); + + ClusterConnectionProxy(session).expiryId(expiry.getId()); ClusterConnectionMembershipBody membership; map.toMethodBody(membership); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->handle(frame); + connection.close(); QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl); } @@ -202,7 +206,6 @@ class MessageUpdater { sb.get()->send(transfer, message.payload->getFrames()); if (message.payload->isContentReleased()){ uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize; - uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); bool morecontent = true; for (uint64_t offset = 0; morecontent; offset += maxContentSize) |
