summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.h')
-rw-r--r--cpp/src/qpid/cluster/Cluster.h308
1 files changed, 0 insertions, 308 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
deleted file mode 100644
index 78d325cdf9..0000000000
--- a/cpp/src/qpid/cluster/Cluster.h
+++ /dev/null
@@ -1,308 +0,0 @@
-#ifndef QPID_CLUSTER_CLUSTER_H
-#define QPID_CLUSTER_CLUSTER_H
-
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "ClusterMap.h"
-#include "ClusterSettings.h"
-#include "Cpg.h"
-#include "Decoder.h"
-#include "ErrorCheck.h"
-#include "Event.h"
-#include "EventFrame.h"
-#include "ExpiryPolicy.h"
-#include "FailoverExchange.h"
-#include "InitialStatusMap.h"
-#include "LockedConnectionMap.h"
-#include "Multicaster.h"
-#include "NoOpConnectionOutputHandler.h"
-#include "PollableQueue.h"
-#include "PollerDispatch.h"
-#include "Quorum.h"
-#include "StoreStatus.h"
-#include "UpdateReceiver.h"
-
-#include "qmf/org/apache/qpid/cluster/Cluster.h"
-#include "qpid/Url.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/management/Manageable.h"
-#include "qpid/sys/Monitor.h"
-
-#include <boost/bind.hpp>
-#include <boost/intrusive_ptr.hpp>
-#include <boost/optional.hpp>
-
-#include <algorithm>
-#include <map>
-#include <vector>
-
-namespace qpid {
-
-namespace broker {
-class Message;
-}
-
-namespace framing {
-class AMQBody;
-struct Uuid;
-}
-
-namespace cluster {
-
-class Connection;
-struct EventFrame;
-class ClusterTimer;
-class UpdateDataExchange;
-
-/**
- * Connection to the cluster
- */
-class Cluster : private Cpg::Handler, public management::Manageable {
- public:
- typedef boost::intrusive_ptr<Connection> ConnectionPtr;
- typedef std::vector<ConnectionPtr> ConnectionVector;
-
- // Public functions are thread safe unless otherwise mentioned in a comment.
-
- // Construct the cluster in plugin earlyInitialize.
- Cluster(const ClusterSettings&, broker::Broker&);
- virtual ~Cluster();
-
- // Called by plugin initialize: cluster start-up requires transport plugins .
- // Thread safety: only called by plugin initialize.
- void initialize();
-
- // Connection map.
- void addLocalConnection(const ConnectionPtr&);
- void addShadowConnection(const ConnectionPtr&);
- void erase(const ConnectionId&);
-
- // URLs of current cluster members.
- std::vector<std::string> getIds() const;
- std::vector<Url> getUrls() const;
- boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; }
-
- // Leave the cluster - called when fatal errors occur.
- void leave();
-
- // Update completed - called in update thread
- void updateInClosed();
- void updateInDone(const ClusterMap&);
- void updateInRetracted();
- // True if we are expecting to receive catch-up connections.
- bool isExpectingUpdate();
-
- MemberId getId() const;
- broker::Broker& getBroker() const;
- Multicaster& getMulticast() { return mcast; }
-
- const ClusterSettings& getSettings() const { return settings; }
-
- void deliverFrame(const EventFrame&);
-
- // Called in deliverFrame thread to indicate an error from the broker.
- void flagError(Connection&, ErrorCheck::ErrorType, const std::string& msg);
-
- // Called only during update by Connection::shadowReady
- Decoder& getDecoder() { return decoder; }
-
- ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; }
-
- UpdateReceiver& getUpdateReceiver() { return updateReceiver; }
-
- bool isElder() const;
-
- // Generates a log message for debugging purposes.
- std::string debugSnapshot();
-
- // Defer messages delivered in an unsafe context by multicasting.
- bool deferDeliveryImpl(const std::string& queue,
- const boost::intrusive_ptr<broker::Message>& msg);
-
- private:
- typedef sys::Monitor::ScopedLock Lock;
-
- typedef PollableQueue<Event> PollableEventQueue;
- typedef PollableQueue<EventFrame> PollableFrameQueue;
- typedef std::map<ConnectionId, ConnectionPtr> ConnectionMap;
-
- /** Version number of the cluster protocol, to avoid mixed versions. */
- static const uint32_t CLUSTER_VERSION;
-
- // NB: A dummy Lock& parameter marks functions that must only be
- // called with Cluster::lock locked.
-
- void leave(Lock&);
- std::vector<std::string> getIds(Lock&) const;
- std::vector<Url> getUrls(Lock&) const;
-
- // == Called in main thread from Broker destructor.
- void brokerShutdown();
-
- // == Called in deliverEventQueue thread
- void deliveredEvent(const Event&);
-
- // == Called in deliverFrameQueue thread
- void deliveredFrame(const EventFrame&);
- void processFrame(const EventFrame&, Lock&);
-
- // Cluster controls implement XML methods from cluster.xml.
- void updateRequest(const MemberId&, const std::string&, Lock&);
- void updateOffer(const MemberId& updater, uint64_t updatee, Lock&);
- void retractOffer(const MemberId& updater, uint64_t updatee, Lock&);
- void initialStatus(const MemberId&,
- uint32_t version,
- bool active,
- const framing::Uuid& clusterId,
- framing::cluster::StoreState,
- const framing::Uuid& shutdownId,
- const std::string& firstConfig,
- Lock&);
- void ready(const MemberId&, const std::string&, Lock&);
- void configChange(const MemberId&,
- const std::string& members,
- const std::string& left,
- const std::string& joined,
- Lock& l);
- void messageExpired(const MemberId&, uint64_t, Lock& l);
- void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&);
- void timerWakeup(const MemberId&, const std::string& name, Lock&);
- void timerDrop(const MemberId&, const std::string& name, Lock&);
- void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&);
- void deliverToQueue(const std::string& queue, const std::string& message, Lock&);
-
- // Helper functions
- ConnectionPtr getConnection(const EventFrame&, Lock&);
- ConnectionVector getConnections(Lock&);
- void updateStart(const MemberId& updatee, const Url& url, Lock&);
- void makeOffer(const MemberId&, Lock&);
- void setReady(Lock&);
- void memberUpdate(Lock&);
- void setClusterId(const framing::Uuid&, Lock&);
- void erase(const ConnectionId&, Lock&);
- void requestUpdate(Lock& );
- void initMapCompleted(Lock&);
- void becomeElder(Lock&);
- void setMgmtStatus(Lock&);
- void updateMgmtMembership(Lock&);
-
- // == Called in CPG dispatch thread
- void deliver( // CPG deliver callback.
- cpg_handle_t /*handle*/,
- const struct cpg_name *group,
- uint32_t /*nodeid*/,
- uint32_t /*pid*/,
- void* /*msg*/,
- int /*msg_len*/);
-
- void deliverEvent(const Event&);
-
- void configChange( // CPG config change callback.
- cpg_handle_t /*handle*/,
- const struct cpg_name */*group*/,
- const struct cpg_address */*members*/, int /*nMembers*/,
- const struct cpg_address */*left*/, int /*nLeft*/,
- const struct cpg_address */*joined*/, int /*nJoined*/
- );
-
- // == Called in management threads.
- virtual qpid::management::ManagementObject* GetManagementObject() const;
- virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
-
- void stopClusterNode(Lock&);
- void stopFullCluster(Lock&);
-
- // == Called in connection IO threads .
- void checkUpdateIn(Lock&);
-
- // == Called in UpdateClient thread.
- void updateOutDone();
- void updateOutError(const std::exception&);
- void updateOutDone(Lock&);
-
- // Immutable members set on construction, never changed.
- const ClusterSettings settings;
- broker::Broker& broker;
- qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle
- boost::shared_ptr<sys::Poller> poller;
- Cpg cpg;
- const std::string name;
- Url myUrl;
- const MemberId self;
- framing::Uuid clusterId;
- NoOpConnectionOutputHandler shadowOut;
- qpid::management::ManagementAgent* mAgent;
- boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
-
- // Thread safe members
- Multicaster mcast;
- PollerDispatch dispatcher;
- PollableEventQueue deliverEventQueue;
- PollableFrameQueue deliverFrameQueue;
- boost::shared_ptr<FailoverExchange> failoverExchange;
- boost::shared_ptr<UpdateDataExchange> updateDataExchange;
- Quorum quorum;
- LockedConnectionMap localConnections;
-
- // Used only in deliverEventQueue thread or when stalled for update.
- Decoder decoder;
- bool discarding;
-
-
- // Remaining members are protected by lock.
- mutable sys::Monitor lock;
-
-
- // Local cluster state, cluster map
- enum {
- PRE_INIT,///< Have not yet received complete initial status map.
- INIT, ///< Waiting to reach cluster-size.
- JOINER, ///< Sent update request, waiting for update offer.
- UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete.
- CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event.
- READY, ///< Fully operational
- OFFER, ///< Sent an offer, waiting for accept/reject.
- UPDATER, ///< Offer accepted, sending a state update.
- LEFT ///< Final state, left the cluster.
- } state;
-
- ConnectionMap connections;
- InitialStatusMap initMap;
- StoreStatus store;
- ClusterMap map;
- MemberSet elders;
- bool elder;
- size_t lastAliveCount;
- bool lastBroker;
- sys::Thread updateThread;
- boost::optional<ClusterMap> updatedMap;
- bool updateRetracted, updateClosed;
- ErrorCheck error;
- UpdateReceiver updateReceiver;
- ClusterTimer* timer;
-
- friend std::ostream& operator<<(std::ostream&, const Cluster&);
- friend struct ClusterDispatcher;
-};
-
-}} // namespace qpid::cluster
-
-
-
-#endif /*!QPID_CLUSTER_CLUSTER_H*/