diff options
| author | Alan Conway <aconway@apache.org> | 2008-09-12 18:07:47 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-09-12 18:07:47 +0000 |
| commit | ebb59131b198b693c5774dd51656fec520ddd770 (patch) | |
| tree | 688cbbfe3aae8874156cd2ac2bf9430ef3af19df /cpp/src/qpid | |
| parent | ad8e400b786c5868d4e0d2aa880625240e44e311 (diff) | |
| download | qpid-python-ebb59131b198b693c5774dd51656fec520ddd770.tar.gz | |
Added ClusterMap and test. Moved PollableCondition, PollableQueue to sys.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@694758 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 82 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 22 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.cpp | 122 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.h | 86 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cpg.cpp | 11 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Event.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Event.h | 11 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/types.h | 16 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/PollableQueue.h | 56 |
14 files changed, 361 insertions, 58 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 07ed4596e0..9db2a61a82 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -25,7 +25,7 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" -#include "qpid/framing/ClusterJoiningBody.h" +#include "qpid/framing/ClusterUrlNoticeBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/log/Statement.h" @@ -50,7 +50,7 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { Cluster& cluster; MemberId member; ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {} - void joining(const std::string& u) { cluster.joining (member, u); } + void urlNotice(const std::string& u) { cluster.urlNotice(member, u); } void ready() { cluster.ready(member); } void members(const framing::FieldTable& , const framing::FieldTable& , const framing::FieldTable& ) { @@ -58,6 +58,11 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { } bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); } + + virtual void map(const FieldTable& ,const FieldTable& ,const FieldTable& ) { + // FIXME aconway 2008-09-12: TODO + } + }; Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : @@ -72,13 +77,14 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : 0, // write boost::bind(&Cluster::disconnect, this, _1) // disconnect ), - deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, _1))) + connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))), + state(DISCARD) { QPID_LOG(notice, "Cluster member " << self << " joining cluster " << name.str()); broker.addFinalizer(boost::bind(&Cluster::shutdown, this)); cpg.join(name); - deliverQueue.start(poller); + connectionEventQueue.start(poller); cpgDispatchHandle.startWatch(poller); } @@ -103,6 +109,7 @@ void Cluster::erase(ConnectionId id) { void Cluster::leave() { QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << name.str()); cpg.leave(name); + // Cluster will shut down in configChange when the cluster knows we've left. } template <class T> void decodePtr(Buffer& buf, T*& ptr) { @@ -172,8 +179,23 @@ void Cluster::deliver( { try { MemberId from(nodeid, pid); - QPID_LOG(debug, "Cluster::deliver from " << from << " to " << self); // FIXME aconway 2008-09-10: - deliverQueue.push(Event::delivered(from, msg, msg_len)); + Event e = Event::delivered(from, msg, msg_len); + QPID_LOG(trace, "Cluster deliver: " << e); + + // Process cluster controls immediately + if (e.getConnectionId().getConnectionPtr() == 0) { // Cluster control + Buffer buf(e); + AMQFrame frame; + while (frame.decode(buf)) + if (!ClusterOperations(*this, e.getConnectionId().getMember()).invoke(frame)) + throw Exception("Invalid cluster control"); + } + else { // Process connection controls & data via the connectionEventQueue. + if (state != DISCARD) { + e.setConnection(getConnection(e.getConnectionId())); + connectionEventQueue.push(e); + } + } } catch (const std::exception& e) { // FIXME aconway 2008-01-30: exception handling. @@ -183,24 +205,15 @@ void Cluster::deliver( } } -void Cluster::deliverEvent(const Event& e) { - QPID_LOG(trace, "Delivered: " << e); +void Cluster::connectionEvent(const Event& e) { Buffer buf(e); - if (e.getConnection().getConnectionPtr() == 0) { // Cluster control + assert(e.getConnection()); + if (e.getType() == DATA) + e.getConnection()->deliverBuffer(buf); + else { // control AMQFrame frame; - while (frame.decode(buf)) - if (!ClusterOperations(*this, e.getConnection().getMember()).invoke(frame)) - throw Exception("Invalid cluster control"); - } - else { // Connection data or control - boost::intrusive_ptr<Connection> c = getConnection(e.getConnection()); - if (e.getType() == DATA) - c->deliverBuffer(buf); - else { // control - AMQFrame frame; - while (frame.decode(buf)) - c->deliver(frame); - } + while (frame.decode(buf)) + e.getConnection()->deliver(frame); } } @@ -239,7 +252,7 @@ void Cluster::configChange( if (nJoined) // Notfiy new members of my URL. mcastFrame( - AMQFrame(in_place<ClusterJoiningBody>(ProtocolVersion(), url.str())), + AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())), ConnectionId(self,0)); if (find(left, left+nLeft, self) != left+nLeft) { @@ -266,8 +279,15 @@ void Cluster::disconnect(sys::DispatchHandle& ) { broker.shutdown(); } -void Cluster::joining(const MemberId& m, const string& url) { +void Cluster::urlNotice(const MemberId& m, const string& url) { + //FIXME aconway 2008-09-12: Rdo join logic using ClusterMap. Implement xml map function also. + //FIXME aconway 2008-09-11: Note multiple meanings of my own notice - + //from DISCARD->STALL and from STALL->READY via map. + QPID_LOG(info, "Cluster member " << m << " has URL " << url); + // My brain dump is up to this point, stall till it is complete. + if (m == self && state == DISCARD) + state = STALL; urls.insert(UrlMap::value_type(m,Url(url))); } @@ -289,4 +309,18 @@ void Cluster::shutdown() { broker::Broker& Cluster::getBroker(){ return broker; } +void Cluster::stall() { + // Stop processing connection events. We still process config changes + // and cluster controls in deliver() + + // FIXME aconway 2008-09-11: Flow control, we should slow down or + // stop reading from local connections while stalled to avoid an + // unbounded queue. + connectionEventQueue.stop(); +} + +void Cluster::unStall() { + connectionEventQueue.start(poller); +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 3a254684ad..5187cb08e7 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -75,11 +75,14 @@ class Cluster : private Cpg::Handler /** Leave the cluster */ void leave(); - void joining(const MemberId&, const std::string& url); + void urlNotice(const MemberId&, const std::string& url); void ready(const MemberId&); MemberId getSelf() const { return self; } + void stall(); + void unStall(); + void shutdown(); broker::Broker& getBroker(); @@ -88,15 +91,13 @@ class Cluster : private Cpg::Handler typedef std::map<MemberId, Url> UrlMap; typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap; typedef sys::PollableQueue<Event> EventQueue; + enum State { + DISCARD, // Initially discard connection events up to my own join message. + READY, // Normal processing. + STALL // Stalled while a new member joins. + }; - boost::function<void()> shutdownNext; - - /** Handle a delivered frame */ - void deliverFrame(framing::AMQFrame&, const ConnectionId&); - - void deliverBuffer(const char*, size_t, const ConnectionId&); - - void deliverEvent(const Event&); + void connectionEvent(const Event&); /** CPG deliver callback. */ void deliver( @@ -136,7 +137,8 @@ class Cluster : private Cpg::Handler ConnectionMap connections; NoOpConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; - EventQueue deliverQueue; + EventQueue connectionEventQueue; + State state; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&); diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp new file mode 100644 index 0000000000..b0c45ad625 --- /dev/null +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -0,0 +1,122 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ClusterMap.h" +#include "qpid/Url.h" +#include "qpid/framing/FieldTable.h" +#include <boost/bind.hpp> +#include <algorithm> +#include <functional> + +namespace qpid { +using namespace framing; + +namespace cluster { + +ClusterMap::ClusterMap() {} + +MemberId ClusterMap::urlNotice(const MemberId& id, const Url& url) { + if (isMember(id)) return MemberId(); // Ignore notices from established members. + if (isDumpee(id)) { + // Dumpee caught up, graduate to member with new URL and remove dumper from list. + dumpees.erase(id); + members[id] = url; + } + else if (members.empty()) { + // First in cluster, congratulations! + members[id] = url; + } + else { + // New member needs brain dump. + MemberId dumper = nextDumper(); + Dumpee& d = dumpees[id]; + d.url = url; + d.dumper = dumper; + return dumper; + } + return MemberId(); +} + +MemberId ClusterMap::nextDumper() const { + // Choose the first member in member-id order of the group that + // has the least number of dumps-in-progress. + assert(!members.empty()); + MemberId dumper = members.begin()->first; + int minDumps = dumps(dumper); + MemberMap::const_iterator i = ++members.begin(); + while (i != members.end()) { + int d = dumps(i->first); + if (d < minDumps) { + minDumps = d; + dumper = i->first; + } + ++i; + } + return dumper; +} + +void ClusterMap::leave(const MemberId& id) { + if (isDumpee(id)) + dumpees.erase(id); + if (isMember(id)) { + members.erase(id); + DumpeeMap::iterator i = dumpees.begin(); + while (i != dumpees.end()) { + if (i->second.dumper == id) dumpees.erase(i++); + else ++i; + } + } +} + +struct ClusterMap::MatchDumper { + MemberId d; + MatchDumper(const MemberId& i) : d(i) {} + bool operator()(const DumpeeMap::value_type& v) const { return v.second.dumper == d; } +}; + +int ClusterMap::dumps(const MemberId& id) const { + return std::count_if(dumpees.begin(), dumpees.end(), MatchDumper(id)); +} + +void ClusterMap::dumpFailed(const MemberId& dumpee) { dumpees.erase(dumpee); } + +framing::ClusterMapBody ClusterMap::toControl() const { + framing::ClusterMapBody b; + for (MemberMap::const_iterator i = members.begin(); i != members.end(); ++i) + b.getMembers().setString(i->first.str(), i->second.str()); + for (DumpeeMap::const_iterator i = dumpees.begin(); i != dumpees.end(); ++i) { + b.getDumpees().setString(i->first.str(), i->second.url.str()); + b.getDumps().setString(i->first.str(), i->second.dumper.str()); + } + return b; +} + +void ClusterMap::fromControl(const framing::ClusterMapBody& b) { + *this = ClusterMap(); // Reset any current contents. + FieldTable::ValueMap::const_iterator i; + for (i = b.getMembers().begin(); i != b.getMembers().end(); ++i) + members[i->first] = Url(i->second->get<std::string>()); + for (i = b.getDumpees().begin(); i != b.getDumpees().end(); ++i) + dumpees[i->first].url = Url(i->second->get<std::string>()); + for (i = b.getDumps().begin(); i != b.getDumps().end(); ++i) + dumpees[i->first].dumper = MemberId(i->second->get<std::string>()); +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h new file mode 100644 index 0000000000..7695ebeabb --- /dev/null +++ b/cpp/src/qpid/cluster/ClusterMap.h @@ -0,0 +1,86 @@ +#ifndef QPID_CLUSTER_CLUSTERMAP_H +#define QPID_CLUSTER_CLUSTERMAP_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "qpid/framing/ClusterMapBody.h" +#include "qpid/Url.h" +#include <boost/optional.hpp> +#include <vector> +#include <map> + +namespace qpid { +namespace cluster { + +/** + * Map of established cluster members and brain-dumps in progress. + * A dumper is an established member that is sending catch-up data. + * A dumpee is an aspiring member that is receiving catch-up data. + */ +class ClusterMap +{ + public: + ClusterMap(); + + /** Update map for url-notice event. + *@param from Member that sent the notice. + *@param url URL for from. + *@return MemberId of member that should dump to URL, or a null + * MemberId() if no dump is needed. + */ + MemberId urlNotice(const MemberId& from, const Url& url); + + /** Dump failed notice */ + void dumpFailed(const MemberId&); + + /** Update map for leave event */ + void leave(const MemberId&); + + /** Number of unfinished dumps for member. */ + int dumps(const MemberId&) const; + + /** Convert map contents to a cluster control body. */ + framing::ClusterMapBody toControl() const; + + /** Initialize map contents from a cluster control body. */ + void fromControl(const framing::ClusterMapBody&); + + size_t memberCount() const { return members.size(); } + size_t dumpeeCount() const { return dumpees.size(); } + bool isMember(const MemberId& id) const { return members.find(id) != members.end(); } + bool isDumpee(const MemberId& id) const { return dumpees.find(id) != dumpees.end(); } + + private: + struct Dumpee { Url url; MemberId dumper; }; + typedef std::map<MemberId, Url> MemberMap; + typedef std::map<MemberId, Dumpee> DumpeeMap; + struct MatchDumper; + + MemberId nextDumper() const; + + MemberMap members; + DumpeeMap dumpees; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_CLUSTERMAP_H*/ diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index f4128634a6..8457467196 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -76,7 +76,6 @@ struct ClusterPlugin : public Plugin { void initialize(Plugin::Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); if (!broker || values.name.empty()) return; // Only if --cluster-name option was specified. - QPID_LOG_IF(warning, cluster, "Ignoring multiple initialization of cluster plugin."); cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker); broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 68d1b16dfa..00d3901886 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -19,6 +19,7 @@ * */ #include "Connection.h" +#include "Cluster.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/Invoker.h" #include "qpid/framing/AllInvoker.h" diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index a30350585f..37ff2ac6b4 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -23,9 +23,9 @@ */ #include "types.h" -#include "Cluster.h" #include "WriteEstimate.h" #include "OutputInterceptor.h" +#include "NoOpConnectionOutputHandler.h" #include "qpid/broker/Connection.h" #include "qpid/amqp_0_10/Connection.h" @@ -39,6 +39,7 @@ namespace qpid { namespace framing { class AMQFrame; } namespace cluster { +class Cluster; /** Intercept broker::Connection calls for shadow and local cluster connections. */ class Connection : diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp index 6179eab724..ed046f2ede 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -20,6 +20,7 @@ */ #include "ConnectionCodec.h" #include "Connection.h" +#include "Cluster.h" #include "ProxyInputHandler.h" #include "qpid/broker/Connection.h" #include "qpid/log/Statement.h" diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp index 96a5b3da43..2a77fa437a 100644 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ b/cpp/src/qpid/cluster/Cpg.cpp @@ -183,4 +183,15 @@ ostream& operator<<(ostream& o, const ConnectionId& c) { return o << c.first << "-" << c.second; } +std::string MemberId::str() const { + char s[8]; + reinterpret_cast<uint32_t&>(s[0]) = htonl(first); + reinterpret_cast<uint32_t&>(s[4]) = htonl(second); + return std::string(s,8); +} + +MemberId::MemberId(const std::string& s) { + first = ntohl(reinterpret_cast<const uint32_t&>(s[0])); + second = ntohl(reinterpret_cast<const uint32_t&>(s[4])); +} }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index 4dfb7ab400..89c6268d7f 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -34,7 +34,7 @@ using framing::Buffer; const size_t Event::OVERHEAD = sizeof(uint8_t) + sizeof(uint64_t); Event::Event(EventType t, const ConnectionId c, const size_t s) - : type(t), connection(c), size(s), data(RefCountedBuffer::create(s)) {} + : type(t), connectionId(c), size(s), data(RefCountedBuffer::create(s)) {} Event Event::delivered(const MemberId& m, void* d, size_t s) { Buffer buf(static_cast<char*>(d), s); @@ -50,7 +50,7 @@ void Event::mcast (const Cpg::Name& name, Cpg& cpg) const { char header[OVERHEAD]; Buffer b(header, OVERHEAD); b.putOctet(type); - b.putLongLong(reinterpret_cast<uint64_t>(connection.getConnectionPtr())); + b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getConnectionPtr())); iovec iov[] = { { header, OVERHEAD }, { const_cast<char*>(getData()), getSize() } }; cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov)); } @@ -61,7 +61,7 @@ Event::operator Buffer() const { static const char* EVENT_TYPE_NAMES[] = { "data", "control" }; std::ostream& operator << (std::ostream& o, const Event& e) { - o << "[event: " << e.getConnection() + o << "[event: " << e.getConnectionId() << " " << EVENT_TYPE_NAMES[e.getType()] << " " << e.getSize() << " bytes: "; std::ostream_iterator<char> oi(o,""); diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index d0e12831f4..a0f9bc0e49 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -24,6 +24,7 @@ #include "types.h" #include "Cpg.h" +#include "Connection.h" #include "qpid/RefCountedBuffer.h" #include "qpid/framing/Buffer.h" #include <iosfwd> @@ -39,7 +40,7 @@ namespace cluster { * Events are sent to/received from the cluster. * Refcounted so they can be stored on queues. */ -struct Event { +class Event { public: /** Create an event to mcast with a buffer of size bytes. */ Event(EventType t=DATA, const ConnectionId c=ConnectionId(), size_t size=0); @@ -50,17 +51,21 @@ struct Event { void mcast(const Cpg::Name& name, Cpg& cpg) const; EventType getType() const { return type; } - ConnectionId getConnection() const { return connection; } + ConnectionId getConnectionId() const { return connectionId; } size_t getSize() const { return size; } char* getData() { return data; } const char* getData() const { return data; } + boost::intrusive_ptr<Connection> getConnection() const { return connection; } + void setConnection(const boost::intrusive_ptr<Connection>& c) { connection=c; } + operator framing::Buffer() const; private: static const size_t OVERHEAD; EventType type; - ConnectionId connection; + ConnectionId connectionId; + boost::intrusive_ptr<Connection> connection; size_t size; RefCountedBuffer::pointer data; }; diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index ae021a9c4a..82b0d3f077 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -20,6 +20,7 @@ */ #include "OutputInterceptor.h" #include "Connection.h" +#include "Cluster.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/framing/AMQFrame.h" #include "qpid/log/Statement.h" diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h index 0cd6f1afbb..d62ad62b49 100644 --- a/cpp/src/qpid/cluster/types.h +++ b/cpp/src/qpid/cluster/types.h @@ -23,6 +23,8 @@ */ #include <utility> #include <iosfwd> +#include <string> + #include <stdint.h> extern "C" { @@ -39,10 +41,15 @@ enum EventType { DATA, CONTROL }; /** first=node-id, second=pid */ struct MemberId : std::pair<uint32_t, uint32_t> { - MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {} + explicit MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {} MemberId(const cpg_address& caddr) : std::pair<uint32_t,uint32_t>(caddr.nodeid, caddr.pid) {} + MemberId(const std::string&); // Decode from string. uint32_t getNode() const { return first; } uint32_t getPid() const { return second; } + operator bool() const { return first || second; } + + // Encode as string, network byte order. + std::string str() const; }; inline bool operator==(const cpg_address& caddr, const MemberId& id) { return id == MemberId(caddr); } @@ -55,6 +62,13 @@ struct ConnectionId : public std::pair<MemberId, Connection*> { Connection* getConnectionPtr() const { return second; } }; +/** State of a cluster member */ +enum State { + DISCARD, // Initially discard connection events up to my own join message. + STALL, // All members stall while a new member joins. + READY // Normal processing. +}; + std::ostream& operator<<(std::ostream&, const ConnectionId&); }} // namespace qpid::cluster diff --git a/cpp/src/qpid/sys/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index 2e5d3a0d3d..153ae31135 100644 --- a/cpp/src/qpid/sys/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -24,7 +24,7 @@ #include "qpid/sys/PollableCondition.h" #include "qpid/sys/Dispatcher.h" -#include "qpid/sys/Mutex.h" +#include "qpid/sys/Monitor.h" #include <boost/function.hpp> #include <boost/bind.hpp> #include <algorithm> @@ -54,58 +54,84 @@ class PollableQueue { /** Callback to process a range of items. */ typedef boost::function<void (const iterator&, const iterator&)> Callback; - /** Functor tempalate to create a Callback from a functor that handles a single item. */ + /** @see forEach() */ template <class F> struct ForEach { F handleOne; ForEach(const F& f) : handleOne(f) {} void operator()(const iterator& i, const iterator& j) const { std::for_each(i, j, handleOne); } }; - /** Function to create ForEach instances */ + + /** Create a range callback from a functor that processes a single item. */ template <class F> static ForEach<F> forEach(const F& f) { return ForEach<F>(f); } /** When the queue is selected by the poller, values are passed to callback cb. */ explicit PollableQueue(const Callback& cb); /** Push a value onto the queue. Thread safe */ - void push(const T& t) { ScopedLock l(lock); queue.push_back(t); condition.set(); } + void push(const T& t); /** Start polling. */ - void start(const boost::shared_ptr<sys::Poller>& poller) { handle.startWatch(poller); } + void start(const boost::shared_ptr<sys::Poller>& poller); - /** Stop polling. */ - void stop() { handle.stopWatch(); } + /** Stop polling and wait for the current callback, if any, to complete. */ + void stop(); private: - typedef sys::Mutex::ScopedLock ScopedLock; - typedef sys::Mutex::ScopedUnlock ScopedUnlock; + typedef sys::Monitor::ScopedLock ScopedLock; + typedef sys::Monitor::ScopedUnlock ScopedUnlock; void dispatch(sys::DispatchHandle&); - sys::Mutex lock; + sys::Monitor lock; Callback callback; PollableCondition condition; sys::DispatchHandle handle; Queue queue; Queue batch; + bool dispatching, stopped; }; template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12: : callback(cb), - handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0) + handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0), + dispatching(false), stopped(true) {} +template <class T> void PollableQueue<T>::start(const boost::shared_ptr<sys::Poller>& poller) { + ScopedLock l(lock); + stopped = false; + handle.startWatch(poller); +} + +template <class T> void PollableQueue<T>::push(const T& t) { + ScopedLock l(lock); + queue.push_back(t); + condition.set(); +} + template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { - ScopedLock l(lock); // Lock for concurrent push() - batch.clear(); - batch.swap(queue); + ScopedLock l(lock); + if (stopped) return; + dispatching = true; condition.clear(); + batch.clear(); + batch.swap(queue); // Snapshot of current queue contents. { // Process outside the lock to allow concurrent push. ScopedUnlock u(lock); callback(batch.begin(), batch.end()); - h.rewatch(); } batch.clear(); + dispatching = false; + if (stopped) lock.notifyAll(); + else h.rewatch(); +} + +template <class T> void PollableQueue<T>::stop() { + ScopedLock l(lock); + handle.stopWatch(); + stopped = true; + while (dispatching) lock.wait(); } }} // namespace qpid::sys |
