diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 134 |
1 files changed, 131 insertions, 3 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 1f39fe9ae9..8a93773718 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -16,6 +16,74 @@ * */ +/** CLUSTER IMPLEMENTATION OVERVIEW + * + * The cluster works on the principle that if all members of the + * cluster receive identical input, they will all produce identical + * results. cluster::Connections intercept data received from clients + * and multicast it via CPG. The data is processed (passed to the + * broker::Connection) only when it is received from CPG in cluster + * order. Each cluster member has Connection objects for directly + * connected clients and "shadow" Connection objects for connections + * to other members. + * + * This assumes that all broker actions occur deterministically in + * response to data arriving on client connections. There are two + * situations where this assumption fails: + * - sending data in response to polling local connections for writabiliy. + * - taking actions based on a timer or timestamp comparison. + * + * IMPORTANT NOTE: any time code is added to the broker that uses timers, + * the cluster may need to be updated to take account of this. + * + * + * USE OF TIMESTAMPS IN THE BROKER + * + * The following are the current areas where broker uses timers or timestamps: + * + * - Producer flow control: broker::SemanticState uses connection::getClusterOrderOutput. + * a FrameHandler that sends frames to the client via the cluster. Used by broker::SessionState + * + * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is implemented by cluster::ExpiryPolicy. + * + * - Connection heartbeat: sends connection controls, not part of session command counting so OK to ignore. + * + * - LinkRegistry: only cluster elder is ever active for links. + * + * - management::ManagementBroker: uses MessageHandler supplied by cluster + * to send messages to the broker via the cluster. + * + * - Dtx: not yet supported with cluster. + * + * cluster::ExpiryPolicy implements the strategy for message expiry. + * + * CLUSTER PROTOCOL OVERVIEW + * + * Messages sent to/from CPG are called Events. + * + * An Event carries a ConnectionId, which includes a MemberId and a + * connection number. + * + * Events are either + * - Connection events: non-0 connection number and are associated with a connection. + * - Cluster Events: 0 connection number, are not associated with a connectin. + * + * Events are further categorized as: + * - Control: carries method frame(s) that affect cluster behavior. + * - Data: carries raw data received from a client connection. + * + * The cluster defines extensions to the AMQP command set in ../../../xml/cluster.xml + * which defines two classes: + * - cluster: cluster control information. + * - cluster.connection: control information for a specific connection. + * + * The following combinations are legal: + * - Data frames carrying connection data. + * - Cluster control events carrying cluster commands. + * - Connection control events carrying cluster.connection commands. + * - Connection control events carrying non-cluster frames: frames sent to the client. + * e.g. flow-control frames generated on a timer. + */ #include "Cluster.h" #include "ClusterSettings.h" #include "Connection.h" @@ -30,6 +98,7 @@ #include "qpid/broker/Connection.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/SessionState.h" +#include "qpid/framing/frame_functors.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" @@ -41,6 +110,15 @@ #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterUpdateOfferBody.h" #include "qpid/framing/ClusterUpdateRequestBody.h" + +#include "qpid/framing/ConnectionStartOkBody.h" +#include "qpid/framing/ConnectionTuneBody.h" +#include "qpid/framing/ConnectionOpenBody.h" +#include "qpid/framing/SessionAttachBody.h" +#include "qpid/framing/SessionRequestTimeoutBody.h" +#include "qpid/framing/SessionCommandPointBody.h" +#include "qpid/framing/AMQP_ClientProxy.h" + #include "qpid/log/Helpers.h" #include "qpid/log/Statement.h" #include "qpid/management/IdAllocator.h" @@ -57,6 +135,7 @@ #include <iterator> #include <map> #include <ostream> +#include <sstream> namespace qpid { namespace cluster { @@ -127,11 +206,10 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // Failover exchange provides membership updates to clients. failoverExchange.reset(new FailoverExchange(this)); broker.getExchanges().registerExchange(failoverExchange); - - // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange. broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); - + broker.setClusterMessageHandler(*this); if (settings.quorum) quorum.init(); + cpg.join(name); // pump the CPG dispatch manually till we get initialized. while (!initialized) @@ -666,6 +744,7 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { } void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) { + if (state == LEFT) return; // If we receive an errorCheck here, it's because we have processed past the point // of the error so respond with ERROR_TYPE_NONE assert(map.getFrameSeq() >= frameSeq); @@ -674,4 +753,53 @@ void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self); } +size_t accumulateEncodedSize(size_t total, const AMQFrame& f) { return total + f.encodedSize(); } + +// +// If the broker needs to send messages to itself in an +// unpredictable context (e.g. management messages generated when +// a timer expires) it uses "selfConnection" +// +// selfConnection behaves as a local client connection, with +// respect to replication. However instead of mcasting data from a +// client, data for the selfConnection is mcast directly from +// Cluster::handle. +// +void Cluster::handle(const boost::intrusive_ptr<broker::Message>& msg) { + // NOTE: don't take the lock here. We don't need to as mcast is thread safe, + // and locking here can cause deadlock with management locks. + // + + // Create self-connection on demand + if (selfConnection == ConnectionId()) { + QPID_LOG(debug, "Initialize self-connection"); + ostringstream name; + name << "qpid.cluster-self." << self; + ConnectionPtr selfc = new Connection(*this, shadowOut, name.str(), self, false, false); + selfConnection = selfc->getId(); + vector<AMQFrame> frames; + frames.push_back(AMQFrame((ConnectionStartOkBody()))); + frames.push_back(AMQFrame((ConnectionTuneBody(ProtocolVersion(),32767,65535,0,120)))); + frames.push_back(AMQFrame((ConnectionOpenBody()))); + frames.push_back(AMQFrame((SessionAttachBody(ProtocolVersion(), name.str(), false)))); + frames.push_back(AMQFrame(SessionRequestTimeoutBody(ProtocolVersion(), 0))); + frames.push_back(AMQFrame(SessionCommandPointBody(ProtocolVersion(), 0, 0))); + size_t size = accumulate(frames.begin(), frames.end(), 0, accumulateEncodedSize); + vector<char> store(size); + Buffer buf(store.data(), size); + for_each(frames.begin(), frames.end(), boost::bind(&AMQFrame::encode, _1, boost::ref(buf))); + assert(buf.available() == 0); + selfc->decode(store.data(), size); // Multicast + } + + QPID_LOG(trace, "Message to self on " << selfConnection << ": " << *msg->getFrames().getMethod()); + const FrameSet& frames = msg->getFrames(); + size_t size = accumulate(frames.begin(), frames.end(), 0, accumulateEncodedSize); + Event e(DATA, selfConnection, size); + Buffer buf(e.getData(), e.getSize()); + EncodeFrame encoder(buf); + msg->getFrames().map(encoder); + mcast.mcast(e); +} + }} // namespace qpid::cluster |
