diff options
Diffstat (limited to 'cpp/src/qpid/cluster')
64 files changed, 0 insertions, 8465 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp deleted file mode 100644 index 0daf0c7f5a..0000000000 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ /dev/null @@ -1,1176 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/** - * <h1>CLUSTER IMPLEMENTATION OVERVIEW</h1> - * - * The cluster works on the principle that if all members of the - * cluster receive identical input, they will all produce identical - * results. cluster::Connections intercept data received from clients - * and multicast it via CPG. The data is processed (passed to the - * broker::Connection) only when it is received from CPG in cluster - * order. Each cluster member has Connection objects for directly - * connected clients and "shadow" Connection objects for connections - * to other members. - * - * This assumes that all broker actions occur deterministically in - * response to data arriving on client connections. There are two - * situations where this assumption fails: - * - sending data in response to polling local connections for writabiliy. - * - taking actions based on a timer or timestamp comparison. - * - * IMPORTANT NOTE: any time code is added to the broker that uses timers, - * the cluster may need to be updated to take account of this. - * - * - * USE OF TIMESTAMPS IN THE BROKER - * - * The following are the current areas where broker uses timers or timestamps: - * - * - Producer flow control: broker::SemanticState uses - * connection::getClusterOrderOutput. a FrameHandler that sends - * frames to the client via the cluster. Used by broker::SessionState - * - * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is - * implemented by cluster::ExpiryPolicy. - * - * - Connection heartbeat: sends connection controls, not part of - * session command counting so OK to ignore. - * - * - LinkRegistry: only cluster elder is ever active for links. - * - * - management::ManagementBroker: uses MessageHandler supplied by cluster - * to send messages to the broker via the cluster. - * - * - Dtx: not yet supported with cluster. - * - * cluster::ExpiryPolicy implements the strategy for message expiry. - * - * ClusterTimer implements periodic timed events in the cluster context. - * Used for periodic management events. - * - * <h1>CLUSTER PROTOCOL OVERVIEW</h1> - * - * Messages sent to/from CPG are called Events. - * - * An Event carries a ConnectionId, which includes a MemberId and a - * connection number. - * - * Events are either - * - Connection events: non-0 connection number and are associated with a connection. - * - Cluster Events: 0 connection number, are not associated with a connection. - * - * Events are further categorized as: - * - Control: carries method frame(s) that affect cluster behavior. - * - Data: carries raw data received from a client connection. - * - * The cluster defines extensions to the AMQP command set in ../../../xml/cluster.xml - * which defines two classes: - * - cluster: cluster control information. - * - cluster.connection: control information for a specific connection. - * - * The following combinations are legal: - * - Data frames carrying connection data. - * - Cluster control events carrying cluster commands. - * - Connection control events carrying cluster.connection commands. - * - Connection control events carrying non-cluster frames: frames sent to the client. - * e.g. flow-control frames generated on a timer. - * - * <h1>CLUSTER INITIALIZATION OVERVIEW</h1> - * - * @see InitialStatusMap - * - * When a new member joins the CPG group, all members (including the - * new one) multicast their "initial status." The new member is in - * PRE_INIT mode until it gets a complete set of initial status - * messages from all cluster members. In a newly-forming cluster is - * then in INIT mode until the configured cluster-size members have - * joined. - * - * The newcomer uses initial status to determine - * - The cluster UUID - * - Am I speaking the correct version of the cluster protocol? - * - Do I need to get an update from an existing active member? - * - Can I recover from my own store? - * - * Pre-initialization happens in the Cluster constructor (plugin - * early-init phase) because it needs to set the recovery flag before - * the store initializes. This phase lasts until inital-status is - * received for all active members. The PollableQueues and Multicaster - * are in "bypass" mode during this phase since the poller has not - * started so there are no threads to serve pollable queues. - * - * The remaining initialization happens in Cluster::initialize() or, - * if cluster-size=N is specified, in the deliver thread when an - * initial-status control is delivered that brings the total to N. - */ -#include "qpid/Exception.h" -#include "qpid/cluster/Cluster.h" -#include "qpid/sys/ClusterSafe.h" -#include "qpid/cluster/ClusterSettings.h" -#include "qpid/cluster/Connection.h" -#include "qpid/cluster/UpdateClient.h" -#include "qpid/cluster/RetractClient.h" -#include "qpid/cluster/FailoverExchange.h" -#include "qpid/cluster/UpdateDataExchange.h" -#include "qpid/cluster/UpdateExchange.h" -#include "qpid/cluster/ClusterTimer.h" - -#include "qpid/assert.h" -#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" -#include "qmf/org/apache/qpid/cluster/Package.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/Connection.h" -#include "qpid/broker/NullMessageStore.h" -#include "qpid/broker/QueueRegistry.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/Message.h" -#include "qpid/broker/SessionState.h" -#include "qpid/broker/SignalHandler.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/AMQP_AllOperations.h" -#include "qpid/framing/AllInvoker.h" -#include "qpid/framing/ClusterConfigChangeBody.h" -#include "qpid/framing/ClusterConnectionDeliverCloseBody.h" -#include "qpid/framing/ClusterConnectionAbortBody.h" -#include "qpid/framing/ClusterRetractOfferBody.h" -#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" -#include "qpid/framing/ClusterReadyBody.h" -#include "qpid/framing/ClusterShutdownBody.h" -#include "qpid/framing/ClusterUpdateOfferBody.h" -#include "qpid/framing/ClusterUpdateRequestBody.h" -#include "qpid/framing/ClusterConnectionAnnounceBody.h" -#include "qpid/framing/ClusterErrorCheckBody.h" -#include "qpid/framing/ClusterTimerWakeupBody.h" -#include "qpid/framing/ClusterDeliverToQueueBody.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/log/Helpers.h" -#include "qpid/log/Statement.h" -#include "qpid/management/ManagementAgent.h" -#include "qpid/memory.h" -#include "qpid/sys/Thread.h" - -#include <boost/shared_ptr.hpp> -#include <boost/bind.hpp> -#include <boost/cast.hpp> -#include <boost/current_function.hpp> -#include <algorithm> -#include <iterator> -#include <map> -#include <ostream> - - -namespace qpid { -namespace cluster { -using namespace qpid; -using namespace qpid::framing; -using namespace qpid::sys; -using namespace qpid::cluster; -using namespace framing::cluster; -using namespace std; -using management::ManagementAgent; -using management::ManagementObject; -using management::Manageable; -using management::Args; -namespace _qmf = ::qmf::org::apache::qpid::cluster; - -/** - * NOTE: must increment this number whenever any incompatible changes in - * cluster protocol/behavior are made. It allows early detection and - * sensible reporting of an attempt to mix different versions in a - * cluster. - * - * Currently use SVN revision to avoid clashes with versions from - * different branches. - */ -const uint32_t Cluster::CLUSTER_VERSION = 1097431; - -struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { - qpid::cluster::Cluster& cluster; - MemberId member; - Cluster::Lock& l; - ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {} - - void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } - - void initialStatus(uint32_t version, bool active, const Uuid& clusterId, - uint8_t storeState, const Uuid& shutdownId, - const std::string& firstConfig) - { - cluster.initialStatus( - member, version, active, clusterId, - framing::cluster::StoreState(storeState), shutdownId, - firstConfig, l); - } - void ready(const std::string& url) { - cluster.ready(member, url, l); - } - void configChange(const std::string& members, - const std::string& left, - const std::string& joined) - { - cluster.configChange(member, members, left, joined, l); - } - void updateOffer(uint64_t updatee) { - cluster.updateOffer(member, updatee, l); - } - void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); } - void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } - void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { - cluster.errorCheck(member, type, frameSeq, l); - } - void timerWakeup(const std::string& name) { cluster.timerWakeup(member, name, l); } - void timerDrop(const std::string& name) { cluster.timerDrop(member, name, l); } - void shutdown(const Uuid& id) { cluster.shutdown(member, id, l); } - void deliverToQueue(const std::string& queue, const std::string& message) { - cluster.deliverToQueue(queue, message, l); - } - bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } -}; - -Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : - settings(set), - broker(b), - mgmtObject(0), - poller(b.getPoller()), - cpg(*this), - name(settings.name), - self(cpg.self()), - clusterId(true), - mAgent(0), - expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())), - mcast(cpg, poller, boost::bind(&Cluster::leave, this)), - dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), - deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1), - boost::bind(&Cluster::leave, this), - "Error decoding events, may indicate a broker version mismatch", - poller), - deliverFrameQueue(boost::bind(&Cluster::deliveredFrame, this, _1), - boost::bind(&Cluster::leave, this), - "Error delivering frames", - poller), - failoverExchange(new FailoverExchange(broker.GetVhostObject(), &broker)), - updateDataExchange(new UpdateDataExchange(*this)), - quorum(boost::bind(&Cluster::leave, this)), - decoder(boost::bind(&Cluster::deliverFrame, this, _1)), - discarding(true), - state(PRE_INIT), - initMap(self, settings.size), - store(broker.getDataDir().getPath()), - elder(false), - lastAliveCount(0), - lastBroker(false), - updateRetracted(false), - updateClosed(false), - error(*this) -{ - broker.setInCluster(true); - - // We give ownership of the timer to the broker and keep a plain pointer. - // This is OK as it means the timer has the same lifetime as the broker. - timer = new ClusterTimer(*this); - broker.setClusterTimer(std::auto_ptr<sys::Timer>(timer)); - - // Failover exchange provides membership updates to clients. - broker.getExchanges().registerExchange(failoverExchange); - - // Update exchange is used during updates to replicate messages - // without modifying delivery-properties.exchange. - broker.getExchanges().registerExchange( - boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); - - // Update-data exchange is used for passing data that may be too large - // for single control frame. - broker.getExchanges().registerExchange(updateDataExchange); - - // Load my store status before we go into initialization - if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { - store.load(); - clusterId = store.getClusterId(); - QPID_LOG(notice, "Cluster store state: " << store) - } - cpg.join(name); - // pump the CPG dispatch manually till we get past PRE_INIT. - while (state == PRE_INIT) - cpg.dispatchOne(); -} - -Cluster::~Cluster() { - broker.setClusterTimer(std::auto_ptr<sys::Timer>(0)); // Delete cluster timer - if (updateThread) updateThread.join(); // Join the previous updatethread. -} - -void Cluster::initialize() { - if (settings.quorum) quorum.start(poller); - if (settings.url.empty()) - myUrl = Url::getIpAddressesUrl(broker.getPort(broker::Broker::TCP_TRANSPORT)); - else - myUrl = settings.url; - broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); - broker.deferDelivery = boost::bind(&Cluster::deferDeliveryImpl, this, _1, _2); - broker.setExpiryPolicy(expiryPolicy); - deliverEventQueue.bypassOff(); - deliverEventQueue.start(); - deliverFrameQueue.bypassOff(); - deliverFrameQueue.start(); - mcast.start(); - - /// Create management object - mAgent = broker.getManagementAgent(); - if (mAgent != 0){ - _qmf::Package packageInit(mAgent); - mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str()); - mAgent->addObject (mgmtObject); - } - - // Run initMapCompleted immediately to process the initial configuration - // that allowed us to transition out of PRE_INIT - assert(state == INIT); - initMapCompleted(*(Mutex::ScopedLock*)0); // Fake lock, single-threaded context. - - // Add finalizer last for exception safety. - broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this)); - - // Start dispatching CPG events. - dispatcher.start(); -} - -// Called in connection thread to insert a client connection. -void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) { - assert(c->getId().getMember() == self); - localConnections.insert(c); -} - -// Called in connection thread to insert an updated shadow connection. -void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { - QPID_LOG(debug, *this << " new shadow connection " << c->getId()); - // Safe to use connections here because we're pre-catchup, stalled - // and discarding, so deliveredFrame is not processing any - // connection events. - assert(discarding); - pair<ConnectionMap::iterator, bool> ib - = connections.insert(ConnectionMap::value_type(c->getId(), c)); - assert(ib.second); -} - -void Cluster::erase(const ConnectionId& id) { - Lock l(lock); - erase(id,l); -} - -// Called by Connection::deliverClose() in deliverFrameQueue thread. -void Cluster::erase(const ConnectionId& id, Lock&) { - connections.erase(id); - decoder.erase(id); -} - -std::vector<string> Cluster::getIds() const { - Lock l(lock); - return getIds(l); -} - -std::vector<string> Cluster::getIds(Lock&) const { - return map.memberIds(); -} - -std::vector<Url> Cluster::getUrls() const { - Lock l(lock); - return getUrls(l); -} - -std::vector<Url> Cluster::getUrls(Lock&) const { - return map.memberUrls(); -} - -void Cluster::leave() { - Lock l(lock); - leave(l); -} - -#define LEAVE_TRY(STMT) try { STMT; } \ - catch (const std::exception& e) { \ - QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \ - } do {} while(0) - -void Cluster::leave(Lock&) { - if (state != LEFT) { - state = LEFT; - QPID_LOG(notice, *this << " leaving cluster " << name); - // Finalize connections now now to avoid problems later in destructor. - ClusterSafeScope css; // Don't trigger cluster-safe assertions. - LEAVE_TRY(localConnections.clear()); - LEAVE_TRY(connections.clear()); - LEAVE_TRY(broker::SignalHandler::shutdown()); - } -} - -// Deliver CPG message. -void Cluster::deliver( - cpg_handle_t /*handle*/, - const cpg_name* /*group*/, - uint32_t nodeid, - uint32_t pid, - void* msg, - int msg_len) -{ - MemberId from(nodeid, pid); - framing::Buffer buf(static_cast<char*>(msg), msg_len); - Event e(Event::decodeCopy(from, buf)); - deliverEvent(e); -} - -void Cluster::deliverEvent(const Event& e) { deliverEventQueue.push(e); } - -void Cluster::deliverFrame(const EventFrame& e) { deliverFrameQueue.push(e); } - -const ClusterUpdateOfferBody* castUpdateOffer(const framing::AMQBody* body) { - return (body && body->getMethod() && - body->getMethod()->isA<ClusterUpdateOfferBody>()) ? - static_cast<const ClusterUpdateOfferBody*>(body) : 0; -} - -const ClusterConnectionAnnounceBody* castAnnounce( const framing::AMQBody *body) { - return (body && body->getMethod() && - body->getMethod()->isA<ClusterConnectionAnnounceBody>()) ? - static_cast<const ClusterConnectionAnnounceBody*>(body) : 0; -} - -// Handler for deliverEventQueue. -// This thread decodes frames from events. -void Cluster::deliveredEvent(const Event& e) { - if (e.isCluster()) { - EventFrame ef(e, e.getFrame()); - // Stop the deliverEventQueue on update offers. - // This preserves the connection decoder fragments for an update. - // Only do this for the two brokers that are directly involved in this - // offer: the one making the offer, or the one receiving it. - const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody()); - if (offer && ( e.getMemberId() == self || MemberId(offer->getUpdatee()) == self) ) { - QPID_LOG(info, *this << " stall for update offer from " << e.getMemberId() - << " to " << MemberId(offer->getUpdatee())); - deliverEventQueue.stop(); - } - deliverFrame(ef); - } - else if(!discarding) { - if (e.isControl()) - deliverFrame(EventFrame(e, e.getFrame())); - else { - try { decoder.decode(e, e.getData()); } - catch (const Exception& ex) { - // Close a connection that is sending us invalid data. - QPID_LOG(error, *this << " aborting connection " - << e.getConnectionId() << ": " << ex.what()); - framing::AMQFrame abort((ClusterConnectionAbortBody())); - deliverFrame(EventFrame(EventHeader(CONTROL, e.getConnectionId()), abort)); - } - } - } -} - -void Cluster::flagError( - Connection& connection, ErrorCheck::ErrorType type, const std::string& msg) -{ - Mutex::ScopedLock l(lock); - if (connection.isCatchUp()) { - QPID_LOG(critical, *this << " error on update connection " << connection - << ": " << msg); - leave(l); - } - error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg); -} - -// Handler for deliverFrameQueue. -// This thread executes the main logic. -void Cluster::deliveredFrame(const EventFrame& efConst) { - Mutex::ScopedLock l(lock); - sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts. - if (state == LEFT) return; - EventFrame e(efConst); - const ClusterUpdateOfferBody* offer = castUpdateOffer(e.frame.getBody()); - if (offer && error.isUnresolved()) { - // We can't honour an update offer that is delivered while an - // error is in progress so replace it with a retractOffer and re-start - // the event queue. - e.frame = AMQFrame( - ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee())); - deliverEventQueue.start(); - } - // Process each frame through the error checker. - if (error.isUnresolved()) { - error.delivered(e); - while (error.canProcess()) // There is a frame ready to process. - processFrame(error.getNext(), l); - } - else - processFrame(e, l); -} - - -void Cluster::processFrame(const EventFrame& e, Lock& l) { - if (e.isCluster()) { - QPID_LOG(trace, *this << " DLVR: " << e); - ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); - if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) - throw Exception(QPID_MSG("Invalid cluster control")); - } - else if (state >= CATCHUP) { - map.incrementFrameSeq(); - ConnectionPtr connection = getConnection(e, l); - if (connection) { - QPID_LOG(trace, *this << " DLVR " << map.getFrameSeq() << ": " << e); - connection->deliveredFrame(e); - } - else - throw Exception(QPID_MSG("Unknown connection: " << e)); - } - else // Drop connection frames while state < CATCHUP - QPID_LOG(trace, *this << " DROP (joining): " << e); -} - -// Called in deliverFrameQueue thread -ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) { - ConnectionId id = e.connectionId; - ConnectionMap::iterator i = connections.find(id); - if (i != connections.end()) return i->second; - ConnectionPtr cp; - // If the frame is an announcement for a new connection, add it. - const ClusterConnectionAnnounceBody *announce = castAnnounce(e.frame.getBody()); - if (e.frame.getBody() && e.frame.getMethod() && announce) - { - if (id.getMember() == self) { // Announces one of my own - cp = localConnections.getErase(id); - assert(cp); - } - else { // New remote connection, create a shadow. - qpid::sys::SecuritySettings secSettings; - if (announce) { - secSettings.ssf = announce->getSsf(); - secSettings.authid = announce->getAuthid(); - secSettings.nodict = announce->getNodict(); - } - cp = new Connection(*this, shadowOut, announce->getManagementId(), id, secSettings); - } - connections.insert(ConnectionMap::value_type(id, cp)); - } - return cp; -} - -Cluster::ConnectionVector Cluster::getConnections(Lock&) { - ConnectionVector result(connections.size()); - std::transform(connections.begin(), connections.end(), result.begin(), - boost::bind(&ConnectionMap::value_type::second, _1)); - return result; -} - -// CPG config-change callback. -void Cluster::configChange ( - cpg_handle_t /*handle*/, - const cpg_name */*group*/, - const cpg_address *members, int nMembers, - const cpg_address *left, int nLeft, - const cpg_address *joined, int nJoined) -{ - Mutex::ScopedLock l(lock); - string membersStr, leftStr, joinedStr; - // Encode members and enqueue as an event so the config change can - // be executed in the correct thread. - for (const cpg_address* p = members; p < members+nMembers; ++p) - membersStr.append(MemberId(*p).str()); - for (const cpg_address* p = left; p < left+nLeft; ++p) - leftStr.append(MemberId(*p).str()); - for (const cpg_address* p = joined; p < joined+nJoined; ++p) - joinedStr.append(MemberId(*p).str()); - deliverEvent(Event::control(ClusterConfigChangeBody( - ProtocolVersion(), membersStr, leftStr, joinedStr), - self)); -} - -void Cluster::setReady(Lock&) { - state = READY; - mcast.setReady(); - broker.getQueueEvents().enable(); - enableClusterSafe(); // Enable cluster-safe assertions. -} - -// Set the management status from the Cluster::state. -// -// NOTE: Management updates are sent based on property changes. In -// order to keep consistency across the cluster, we touch the local -// management status property even if it is locally unchanged for any -// event that could have cause a cluster property change on any cluster member. -void Cluster::setMgmtStatus(Lock&) { - if (mgmtObject) - mgmtObject->set_status(state >= CATCHUP ? "ACTIVE" : "JOINING"); -} - -void Cluster::initMapCompleted(Lock& l) { - // Called on completion of the initial status map. - QPID_LOG(debug, *this << " initial status map complete. "); - setMgmtStatus(l); - if (state == PRE_INIT) { - // PRE_INIT means we're still in the earlyInitialize phase, in the constructor. - // We decide here whether we want to recover from our store. - // We won't recover if we are joining an active cluster or our store is dirty. - if (store.hasStore() && - store.getState() != STORE_STATE_EMPTY_STORE && - (initMap.isActive() || store.getState() == STORE_STATE_DIRTY_STORE)) - broker.setRecovery(false); // Ditch my current store. - state = INIT; - } - else if (state == INIT) { - // INIT means we are past Cluster::initialize(). - - // If we're forming an initial cluster (no active members) - // then we wait to reach the configured cluster-size - if (!initMap.isActive() && initMap.getActualSize() < initMap.getRequiredSize()) { - QPID_LOG(info, *this << initMap.getActualSize() - << " members, waiting for at least " << initMap.getRequiredSize()); - return; - } - - initMap.checkConsistent(); - elders = initMap.getElders(); - QPID_LOG(debug, *this << " elders: " << elders); - if (elders.empty()) - becomeElder(l); - else { - broker.getLinks().setPassive(true); - broker.getQueueEvents().disable(); - QPID_LOG(info, *this << " not active for links."); - } - setClusterId(initMap.getClusterId(), l); - - if (initMap.isUpdateNeeded()) { // Joining established cluster. - broker.setRecovery(false); // Ditch my current store. - broker.setClusterUpdatee(true); - if (mAgent) mAgent->suppress(true); // Suppress mgmt output during update. - state = JOINER; - mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); - QPID_LOG(notice, *this << " joining cluster " << name); - } - else { // I can go ready. - discarding = false; - setReady(l); - memberUpdate(l); - updateMgmtMembership(l); - mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); - QPID_LOG(notice, *this << " joined cluster " << name); - } - } -} - -void Cluster::configChange(const MemberId&, - const std::string& membersStr, - const std::string& leftStr, - const std::string& joinedStr, - Lock& l) -{ - if (state == LEFT) return; - MemberSet members = decodeMemberSet(membersStr); - MemberSet left = decodeMemberSet(leftStr); - MemberSet joined = decodeMemberSet(joinedStr); - QPID_LOG(notice, *this << " configuration change: " << members); - QPID_LOG_IF(notice, !left.empty(), *this << " Members left: " << left); - QPID_LOG_IF(notice, !joined.empty(), *this << " Members joined: " << joined); - - // If we are still joining, make sure there is someone to give us an update. - elders = intersection(elders, members); - if (elders.empty() && INIT < state && state < CATCHUP) { - QPID_LOG(critical, "Cannot update, all potential updaters left the cluster."); - leave(l); - return; - } - bool memberChange = map.configChange(members); - - // Update initital status for members joining or leaving. - initMap.configChange(members); - if (initMap.isResendNeeded()) { - mcast.mcastControl( - ClusterInitialStatusBody( - ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, - store.getState(), store.getShutdownId(), - initMap.getFirstConfigStr() - ), - self); - } - if (initMap.transitionToComplete()) initMapCompleted(l); - - if (state >= CATCHUP && memberChange) { - memberUpdate(l); - if (elders.empty()) becomeElder(l); - } - - updateMgmtMembership(l); // Update on every config change for consistency -} - -void Cluster::becomeElder(Lock&) { - if (elder) return; // We were already the elder. - // We are the oldest, reactive links if necessary - QPID_LOG(info, *this << " became the elder, active for links."); - elder = true; - broker.getLinks().setPassive(false); - timer->becomeElder(); -} - -void Cluster::makeOffer(const MemberId& id, Lock& ) { - if (state == READY && map.isJoiner(id)) { - state = OFFER; - QPID_LOG(info, *this << " send update-offer to " << id); - mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id), self); - } -} - -namespace { -struct AppendQueue { - ostream* os; - AppendQueue(ostream& o) : os(&o) {} - void operator()(const boost::shared_ptr<broker::Queue>& q) { - (*os) << " " << q->getName() << "=" << q->getMessageCount(); - } -}; -} // namespace - -// Log a snapshot of broker state, used for debugging inconsistency problems. -// May only be called in deliver thread. -std::string Cluster::debugSnapshot() { - assertClusterSafe(); - std::ostringstream msg; - msg << "Member joined, frameSeq=" << map.getFrameSeq() << ", queue snapshot:"; - AppendQueue append(msg); - broker.getQueues().eachQueue(append); - return msg.str(); -} - -// Called from Broker::~Broker when broker is shut down. At this -// point we know the poller has stopped so no poller callbacks will be -// invoked. We must ensure that CPG has also shut down so no CPG -// callbacks will be invoked. -// -void Cluster::brokerShutdown() { - sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts. - try { cpg.shutdown(); } - catch (const std::exception& e) { - QPID_LOG(error, *this << " shutting down CPG: " << e.what()); - } - delete this; -} - -void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) { - map.updateRequest(id, url); - makeOffer(id, l); -} - -void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active, - const framing::Uuid& id, - framing::cluster::StoreState store, - const framing::Uuid& shutdownId, - const std::string& firstConfig, - Lock& l) -{ - if (version != CLUSTER_VERSION) { - QPID_LOG(critical, *this << " incompatible cluster versions " << - version << " != " << CLUSTER_VERSION); - leave(l); - return; - } - QPID_LOG_IF(debug, state == PRE_INIT, *this - << " received initial status from " << member); - initMap.received( - member, - ClusterInitialStatusBody(ProtocolVersion(), version, active, id, - store, shutdownId, firstConfig) - ); - if (initMap.transitionToComplete()) initMapCompleted(l); -} - -void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { - try { - if (map.ready(id, Url(url))) - memberUpdate(l); - if (state == CATCHUP && id == self) { - setReady(l); - QPID_LOG(notice, *this << " caught up."); - } - } catch (const Url::Invalid& e) { - QPID_LOG(error, "Invalid URL in cluster ready command: " << url); - } - // Update management on every ready event to be consistent across cluster. - setMgmtStatus(l); - updateMgmtMembership(l); -} - -void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) { - // NOTE: deliverEventQueue has been stopped at the update offer by - // deliveredEvent in case an update is required. - if (state == LEFT) return; - MemberId updatee(updateeInt); - boost::optional<Url> url = map.updateOffer(updater, updatee); - if (updater == self) { - assert(state == OFFER); - if (url) // My offer was first. - updateStart(updatee, *url, l); - else { // Another offer was first. - QPID_LOG(info, *this << " cancelled offer to " << updatee << " unstall"); - setReady(l); - makeOffer(map.firstJoiner(), l); // Maybe make another offer. - deliverEventQueue.start(); // Go back to normal processing - } - } - else if (updatee == self && url) { - assert(state == JOINER); - state = UPDATEE; - QPID_LOG(notice, *this << " receiving update from " << updater); - checkUpdateIn(l); - } - else { - QPID_LOG(info, *this << " unstall, ignore update " << updater - << " to " << updatee); - deliverEventQueue.start(); // Not involved in update. - } - if (updatee != self && url) { - QPID_LOG(debug, debugSnapshot()); - if (mAgent) mAgent->clusterUpdate(); - // Updatee will call clusterUpdate when update completes - } -} - -static client::ConnectionSettings connectionSettings(const ClusterSettings& settings) { - client::ConnectionSettings cs; - cs.username = settings.username; - cs.password = settings.password; - cs.mechanism = settings.mechanism; - return cs; -} - -void Cluster::retractOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) { - // An offer was received while handling an error, and converted to a retract. - // Behavior is very similar to updateOffer. - if (state == LEFT) return; - MemberId updatee(updateeInt); - boost::optional<Url> url = map.updateOffer(updater, updatee); - if (updater == self) { - assert(state == OFFER); - if (url) { // My offer was first. - if (updateThread) - updateThread.join(); // Join the previous updateThread to avoid leaks. - updateThread = Thread(new RetractClient(*url, connectionSettings(settings))); - } - setReady(l); - makeOffer(map.firstJoiner(), l); // Maybe make another offer. - // Don't unstall the event queue, that was already done in deliveredFrame - } - QPID_LOG(debug,*this << " retracted offer " << updater << " to " << updatee); -} - -void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { - // NOTE: deliverEventQueue is already stopped at the stall point by deliveredEvent. - if (state == LEFT) return; - assert(state == OFFER); - state = UPDATER; - QPID_LOG(notice, *this << " sending update to " << updatee << " at " << url); - if (updateThread) - updateThread.join(); // Join the previous updateThread to avoid leaks. - updateThread = Thread( - new UpdateClient(self, updatee, url, broker, map, *expiryPolicy, - getConnections(l), decoder, - boost::bind(&Cluster::updateOutDone, this), - boost::bind(&Cluster::updateOutError, this, _1), - connectionSettings(settings))); -} - -// Called in network thread -void Cluster::updateInClosed() { - Lock l(lock); - assert(!updateClosed); - updateClosed = true; - checkUpdateIn(l); -} - -// Called in update thread. -void Cluster::updateInDone(const ClusterMap& m) { - Lock l(lock); - updatedMap = m; - checkUpdateIn(l); -} - -void Cluster::updateInRetracted() { - Lock l(lock); - updateRetracted = true; - map.clearStatus(); - checkUpdateIn(l); -} - -bool Cluster::isExpectingUpdate() { - Lock l(lock); - return state <= UPDATEE; -} - -// Called in update thread or deliver thread. -void Cluster::checkUpdateIn(Lock& l) { - if (state != UPDATEE) return; // Wait till we reach the stall point. - if (!updateClosed) return; // Wait till update connection closes. - if (updatedMap) { // We're up to date - map = *updatedMap; - failoverExchange->setUrls(getUrls(l)); - mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); - state = CATCHUP; - memberUpdate(l); - // NB: don't updateMgmtMembership() here as we are not in the deliver - // thread. It will be updated on delivery of the "ready" we just mcast. - broker.setClusterUpdatee(false); - discarding = false; // OK to set, we're stalled for update. - QPID_LOG(notice, *this << " update complete, starting catch-up."); - QPID_LOG(debug, debugSnapshot()); // OK to call because we're stalled. - if (mAgent) { - // Update management agent now, after all update activity is complete. - updateDataExchange->updateManagementAgent(mAgent); - mAgent->suppress(false); // Enable management output. - mAgent->clusterUpdate(); - } - // Restore alternate exchange settings on exchanges. - broker.getExchanges().eachExchange( - boost::bind(&broker::Exchange::recoveryComplete, _1, - boost::ref(broker.getExchanges()))); - enableClusterSafe(); // Enable cluster-safe assertions - deliverEventQueue.start(); - } - else if (updateRetracted) { // Update was retracted, request another update - updateRetracted = false; - updateClosed = false; - state = JOINER; - QPID_LOG(notice, *this << " update retracted, sending new update request."); - mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); - deliverEventQueue.start(); - } -} - -void Cluster::updateOutDone() { - Monitor::ScopedLock l(lock); - updateOutDone(l); -} - -void Cluster::updateOutDone(Lock& l) { - QPID_LOG(notice, *this << " update sent"); - assert(state == UPDATER); - state = READY; - deliverEventQueue.start(); // Start processing events again. - makeOffer(map.firstJoiner(), l); // Try another offer -} - -void Cluster::updateOutError(const std::exception& e) { - Monitor::ScopedLock l(lock); - QPID_LOG(error, *this << " error sending update: " << e.what()); - updateOutDone(l); -} - -void Cluster ::shutdown(const MemberId& , const Uuid& id, Lock& l) { - QPID_LOG(notice, *this << " cluster shut down by administrator."); - if (store.hasStore()) store.clean(id); - leave(l); -} - -ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; } - -Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, string&) { - Lock l(lock); - QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]"); - switch (methodId) { - case _qmf::Cluster::METHOD_STOPCLUSTERNODE : - { - _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args; - stringstream stream; - stream << self; - if (iargs.i_brokerId == stream.str()) - stopClusterNode(l); - } - break; - case _qmf::Cluster::METHOD_STOPFULLCLUSTER : - stopFullCluster(l); - break; - default: - return Manageable::STATUS_UNKNOWN_METHOD; - } - return Manageable::STATUS_OK; -} - -void Cluster::stopClusterNode(Lock& l) { - QPID_LOG(notice, *this << " cluster member stopped by administrator."); - leave(l); -} - -void Cluster::stopFullCluster(Lock& ) { - QPID_LOG(notice, *this << " shutting down cluster " << name); - mcast.mcastControl(ClusterShutdownBody(ProtocolVersion(), Uuid(true)), self); -} - -void Cluster::memberUpdate(Lock& l) { - // Ignore config changes while we are joining. - if (state < CATCHUP) return; - QPID_LOG(info, *this << " member update: " << map); - size_t aliveCount = map.aliveCount(); - assert(map.isAlive(self)); - failoverExchange->updateUrls(getUrls(l)); - - // Mark store clean if I am the only broker, dirty otherwise. - if (store.hasStore()) { - if (aliveCount == 1) { - if (store.getState() != STORE_STATE_CLEAN_STORE) { - QPID_LOG(notice, *this << "Sole member of cluster, marking store clean."); - store.clean(Uuid(true)); - } - } - else { - if (store.getState() != STORE_STATE_DIRTY_STORE) { - QPID_LOG(notice, "Running in a cluster, marking store dirty."); - store.dirty(); - } - } - } - - // If I am the last member standing, set queue policies. - if (aliveCount == 1 && lastAliveCount > 1 && state >= CATCHUP) { - QPID_LOG(notice, *this << " last broker standing, update queue policies"); - lastBroker = true; - broker.getQueues().updateQueueClusterState(true); - } - else if (aliveCount > 1 && lastBroker) { - QPID_LOG(notice, *this << " last broker standing joined by " << aliveCount-1 - << " replicas, updating queue policies."); - lastBroker = false; - broker.getQueues().updateQueueClusterState(false); - } - lastAliveCount = aliveCount; - - // Close connections belonging to members that have left the cluster. - ConnectionMap::iterator i = connections.begin(); - while (i != connections.end()) { - ConnectionMap::iterator j = i++; - MemberId m = j->second->getId().getMember(); - if (m != self && !map.isMember(m)) { - j->second->close(); - erase(j->second->getId(), l); - } - } -} - -// See comment on Cluster::setMgmtStatus -void Cluster::updateMgmtMembership(Lock& l) { - if (!mgmtObject) return; - std::vector<Url> urls = getUrls(l); - mgmtObject->set_clusterSize(urls.size()); - string urlstr; - for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) { - if (i != urls.begin()) urlstr += ";"; - urlstr += i->str(); - } - std::vector<string> ids = getIds(l); - string idstr; - for(std::vector<string>::iterator i = ids.begin(); i != ids.end(); i++ ) { - if (i != ids.begin()) idstr += ";"; - idstr += *i; - } - mgmtObject->set_members(urlstr); - mgmtObject->set_memberIDs(idstr); -} - -std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { - static const char* STATE[] = { - "PRE_INIT", "INIT", "JOINER", "UPDATEE", "CATCHUP", - "READY", "OFFER", "UPDATER", "LEFT" - }; - assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1); - o << "cluster(" << cluster.self << " " << STATE[cluster.state]; - if (cluster.error.isUnresolved()) o << "/error"; - return o << ")"; -} - -MemberId Cluster::getId() const { - return self; // Immutable, no need to lock. -} - -broker::Broker& Cluster::getBroker() const { - return broker; // Immutable, no need to lock. -} - -void Cluster::setClusterId(const Uuid& uuid, Lock&) { - clusterId = uuid; - if (store.hasStore()) store.setClusterId(uuid); - if (mgmtObject) { - stringstream stream; - stream << self; - mgmtObject->set_clusterID(clusterId.str()); - mgmtObject->set_memberID(stream.str()); - } - QPID_LOG(notice, *this << " cluster-uuid = " << clusterId); -} - -void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { - expiryPolicy->deliverExpire(id); -} - -void Cluster::errorCheck(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq, Lock&) { - // If we see an errorCheck here (rather than in the ErrorCheck - // class) then we have processed succesfully past the point of the - // error. - if (state >= CATCHUP) // Don't respond pre catchup, we don't know what happened - error.respondNone(from, type, frameSeq); -} - -void Cluster::timerWakeup(const MemberId& , const std::string& name, Lock&) { - if (state >= CATCHUP) // Pre catchup our timer isn't set up. - timer->deliverWakeup(name); -} - -void Cluster::timerDrop(const MemberId& , const std::string& name, Lock&) { - QPID_LOG(debug, "Cluster timer drop " << map.getFrameSeq() << ": " << name) - if (state >= CATCHUP) // Pre catchup our timer isn't set up. - timer->deliverDrop(name); -} - -bool Cluster::isElder() const { - return elder; -} - -void Cluster::deliverToQueue(const std::string& queue, const std::string& message, Lock& l) -{ - broker::Queue::shared_ptr q = broker.getQueues().find(queue); - if (!q) { - QPID_LOG(critical, *this << " cluster delivery to non-existent queue: " << queue); - leave(l); - } - framing::Buffer buf(const_cast<char*>(message.data()), message.size()); - boost::intrusive_ptr<broker::Message> msg(new broker::Message); - msg->decodeHeader(buf); - msg->decodeContent(buf); - q->deliver(msg); -} - -bool Cluster::deferDeliveryImpl(const std::string& queue, - const boost::intrusive_ptr<broker::Message>& msg) -{ - if (isClusterSafe()) return false; - std::string message; - message.resize(msg->encodedSize()); - framing::Buffer buf(const_cast<char*>(message.data()), message.size()); - msg->encode(buf); - mcast.mcastControl(ClusterDeliverToQueueBody(ProtocolVersion(), queue, message), self); - return true; -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h deleted file mode 100644 index 78d325cdf9..0000000000 --- a/cpp/src/qpid/cluster/Cluster.h +++ /dev/null @@ -1,308 +0,0 @@ -#ifndef QPID_CLUSTER_CLUSTER_H -#define QPID_CLUSTER_CLUSTER_H - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "ClusterMap.h" -#include "ClusterSettings.h" -#include "Cpg.h" -#include "Decoder.h" -#include "ErrorCheck.h" -#include "Event.h" -#include "EventFrame.h" -#include "ExpiryPolicy.h" -#include "FailoverExchange.h" -#include "InitialStatusMap.h" -#include "LockedConnectionMap.h" -#include "Multicaster.h" -#include "NoOpConnectionOutputHandler.h" -#include "PollableQueue.h" -#include "PollerDispatch.h" -#include "Quorum.h" -#include "StoreStatus.h" -#include "UpdateReceiver.h" - -#include "qmf/org/apache/qpid/cluster/Cluster.h" -#include "qpid/Url.h" -#include "qpid/broker/Broker.h" -#include "qpid/management/Manageable.h" -#include "qpid/sys/Monitor.h" - -#include <boost/bind.hpp> -#include <boost/intrusive_ptr.hpp> -#include <boost/optional.hpp> - -#include <algorithm> -#include <map> -#include <vector> - -namespace qpid { - -namespace broker { -class Message; -} - -namespace framing { -class AMQBody; -struct Uuid; -} - -namespace cluster { - -class Connection; -struct EventFrame; -class ClusterTimer; -class UpdateDataExchange; - -/** - * Connection to the cluster - */ -class Cluster : private Cpg::Handler, public management::Manageable { - public: - typedef boost::intrusive_ptr<Connection> ConnectionPtr; - typedef std::vector<ConnectionPtr> ConnectionVector; - - // Public functions are thread safe unless otherwise mentioned in a comment. - - // Construct the cluster in plugin earlyInitialize. - Cluster(const ClusterSettings&, broker::Broker&); - virtual ~Cluster(); - - // Called by plugin initialize: cluster start-up requires transport plugins . - // Thread safety: only called by plugin initialize. - void initialize(); - - // Connection map. - void addLocalConnection(const ConnectionPtr&); - void addShadowConnection(const ConnectionPtr&); - void erase(const ConnectionId&); - - // URLs of current cluster members. - std::vector<std::string> getIds() const; - std::vector<Url> getUrls() const; - boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; } - - // Leave the cluster - called when fatal errors occur. - void leave(); - - // Update completed - called in update thread - void updateInClosed(); - void updateInDone(const ClusterMap&); - void updateInRetracted(); - // True if we are expecting to receive catch-up connections. - bool isExpectingUpdate(); - - MemberId getId() const; - broker::Broker& getBroker() const; - Multicaster& getMulticast() { return mcast; } - - const ClusterSettings& getSettings() const { return settings; } - - void deliverFrame(const EventFrame&); - - // Called in deliverFrame thread to indicate an error from the broker. - void flagError(Connection&, ErrorCheck::ErrorType, const std::string& msg); - - // Called only during update by Connection::shadowReady - Decoder& getDecoder() { return decoder; } - - ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; } - - UpdateReceiver& getUpdateReceiver() { return updateReceiver; } - - bool isElder() const; - - // Generates a log message for debugging purposes. - std::string debugSnapshot(); - - // Defer messages delivered in an unsafe context by multicasting. - bool deferDeliveryImpl(const std::string& queue, - const boost::intrusive_ptr<broker::Message>& msg); - - private: - typedef sys::Monitor::ScopedLock Lock; - - typedef PollableQueue<Event> PollableEventQueue; - typedef PollableQueue<EventFrame> PollableFrameQueue; - typedef std::map<ConnectionId, ConnectionPtr> ConnectionMap; - - /** Version number of the cluster protocol, to avoid mixed versions. */ - static const uint32_t CLUSTER_VERSION; - - // NB: A dummy Lock& parameter marks functions that must only be - // called with Cluster::lock locked. - - void leave(Lock&); - std::vector<std::string> getIds(Lock&) const; - std::vector<Url> getUrls(Lock&) const; - - // == Called in main thread from Broker destructor. - void brokerShutdown(); - - // == Called in deliverEventQueue thread - void deliveredEvent(const Event&); - - // == Called in deliverFrameQueue thread - void deliveredFrame(const EventFrame&); - void processFrame(const EventFrame&, Lock&); - - // Cluster controls implement XML methods from cluster.xml. - void updateRequest(const MemberId&, const std::string&, Lock&); - void updateOffer(const MemberId& updater, uint64_t updatee, Lock&); - void retractOffer(const MemberId& updater, uint64_t updatee, Lock&); - void initialStatus(const MemberId&, - uint32_t version, - bool active, - const framing::Uuid& clusterId, - framing::cluster::StoreState, - const framing::Uuid& shutdownId, - const std::string& firstConfig, - Lock&); - void ready(const MemberId&, const std::string&, Lock&); - void configChange(const MemberId&, - const std::string& members, - const std::string& left, - const std::string& joined, - Lock& l); - void messageExpired(const MemberId&, uint64_t, Lock& l); - void errorCheck(const MemberId&, uint8_t type, SequenceNumber frameSeq, Lock&); - void timerWakeup(const MemberId&, const std::string& name, Lock&); - void timerDrop(const MemberId&, const std::string& name, Lock&); - void shutdown(const MemberId&, const framing::Uuid& shutdownId, Lock&); - void deliverToQueue(const std::string& queue, const std::string& message, Lock&); - - // Helper functions - ConnectionPtr getConnection(const EventFrame&, Lock&); - ConnectionVector getConnections(Lock&); - void updateStart(const MemberId& updatee, const Url& url, Lock&); - void makeOffer(const MemberId&, Lock&); - void setReady(Lock&); - void memberUpdate(Lock&); - void setClusterId(const framing::Uuid&, Lock&); - void erase(const ConnectionId&, Lock&); - void requestUpdate(Lock& ); - void initMapCompleted(Lock&); - void becomeElder(Lock&); - void setMgmtStatus(Lock&); - void updateMgmtMembership(Lock&); - - // == Called in CPG dispatch thread - void deliver( // CPG deliver callback. - cpg_handle_t /*handle*/, - const struct cpg_name *group, - uint32_t /*nodeid*/, - uint32_t /*pid*/, - void* /*msg*/, - int /*msg_len*/); - - void deliverEvent(const Event&); - - void configChange( // CPG config change callback. - cpg_handle_t /*handle*/, - const struct cpg_name */*group*/, - const struct cpg_address */*members*/, int /*nMembers*/, - const struct cpg_address */*left*/, int /*nLeft*/, - const struct cpg_address */*joined*/, int /*nJoined*/ - ); - - // == Called in management threads. - virtual qpid::management::ManagementObject* GetManagementObject() const; - virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); - - void stopClusterNode(Lock&); - void stopFullCluster(Lock&); - - // == Called in connection IO threads . - void checkUpdateIn(Lock&); - - // == Called in UpdateClient thread. - void updateOutDone(); - void updateOutError(const std::exception&); - void updateOutDone(Lock&); - - // Immutable members set on construction, never changed. - const ClusterSettings settings; - broker::Broker& broker; - qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle - boost::shared_ptr<sys::Poller> poller; - Cpg cpg; - const std::string name; - Url myUrl; - const MemberId self; - framing::Uuid clusterId; - NoOpConnectionOutputHandler shadowOut; - qpid::management::ManagementAgent* mAgent; - boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; - - // Thread safe members - Multicaster mcast; - PollerDispatch dispatcher; - PollableEventQueue deliverEventQueue; - PollableFrameQueue deliverFrameQueue; - boost::shared_ptr<FailoverExchange> failoverExchange; - boost::shared_ptr<UpdateDataExchange> updateDataExchange; - Quorum quorum; - LockedConnectionMap localConnections; - - // Used only in deliverEventQueue thread or when stalled for update. - Decoder decoder; - bool discarding; - - - // Remaining members are protected by lock. - mutable sys::Monitor lock; - - - // Local cluster state, cluster map - enum { - PRE_INIT,///< Have not yet received complete initial status map. - INIT, ///< Waiting to reach cluster-size. - JOINER, ///< Sent update request, waiting for update offer. - UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete. - CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event. - READY, ///< Fully operational - OFFER, ///< Sent an offer, waiting for accept/reject. - UPDATER, ///< Offer accepted, sending a state update. - LEFT ///< Final state, left the cluster. - } state; - - ConnectionMap connections; - InitialStatusMap initMap; - StoreStatus store; - ClusterMap map; - MemberSet elders; - bool elder; - size_t lastAliveCount; - bool lastBroker; - sys::Thread updateThread; - boost::optional<ClusterMap> updatedMap; - bool updateRetracted, updateClosed; - ErrorCheck error; - UpdateReceiver updateReceiver; - ClusterTimer* timer; - - friend std::ostream& operator<<(std::ostream&, const Cluster&); - friend struct ClusterDispatcher; -}; - -}} // namespace qpid::cluster - - - -#endif /*!QPID_CLUSTER_CLUSTER_H*/ diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp deleted file mode 100644 index a8389095c9..0000000000 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ /dev/null @@ -1,176 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/cluster/ClusterMap.h" -#include "qpid/Url.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/log/Statement.h" -#include <boost/bind.hpp> -#include <algorithm> -#include <functional> -#include <iterator> -#include <ostream> - -using namespace std; -using namespace boost; - -namespace qpid { -using namespace framing; - -namespace cluster { - -namespace { - -void addFieldTableValue(FieldTable::ValueMap::value_type vt, ClusterMap::Map& map, ClusterMap::Set& set) { - MemberId id(vt.first); - set.insert(id); - string url = vt.second->get<string>(); - if (!url.empty()) - map.insert(ClusterMap::Map::value_type(id, Url(url))); -} - -void insertFieldTableFromMapValue(FieldTable& ft, const ClusterMap::Map::value_type& vt) { - ft.setString(vt.first.str(), vt.second.str()); -} - -} - -ClusterMap::ClusterMap() : frameSeq(0) {} - -ClusterMap::ClusterMap(const Map& map) : frameSeq(0) { - transform(map.begin(), map.end(), inserter(alive, alive.begin()), bind(&Map::value_type::first, _1)); - members = map; -} - -ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, - framing::SequenceNumber frameSeq_) - : frameSeq(frameSeq_) -{ - for_each(joinersFt.begin(), joinersFt.end(), bind(&addFieldTableValue, _1, ref(joiners), ref(alive))); - for_each(membersFt.begin(), membersFt.end(), bind(&addFieldTableValue, _1, ref(members), ref(alive))); -} - -void ClusterMap::toMethodBody(framing::ClusterConnectionMembershipBody& b) const { - b.getJoiners().clear(); - for_each(joiners.begin(), joiners.end(), bind(&insertFieldTableFromMapValue, ref(b.getJoiners()), _1)); - for(Set::const_iterator i = alive.begin(); i != alive.end(); ++i) { - if (!isMember(*i) && !isJoiner(*i)) - b.getJoiners().setString(i->str(), string()); - } - b.getMembers().clear(); - for_each(members.begin(), members.end(), bind(&insertFieldTableFromMapValue, ref(b.getMembers()), _1)); - b.setFrameSeq(frameSeq); -} - -Url ClusterMap::getUrl(const Map& map, const MemberId& id) { - Map::const_iterator i = map.find(id); - return i == map.end() ? Url() : i->second; -} - -MemberId ClusterMap::firstJoiner() const { - return joiners.empty() ? MemberId() : joiners.begin()->first; -} - -vector<string> ClusterMap::memberIds() const { - vector<string> ids; - for (Map::const_iterator iter = members.begin(); - iter != members.end(); iter++) { - stringstream stream; - stream << iter->first; - ids.push_back(stream.str()); - } - return ids; -} - -vector<Url> ClusterMap::memberUrls() const { - vector<Url> urls(members.size()); - transform(members.begin(), members.end(), urls.begin(), - bind(&Map::value_type::second, _1)); - return urls; -} - -ClusterMap::Set ClusterMap::getAlive() const { return alive; } - -ClusterMap::Set ClusterMap::getMembers() const { - Set s; - transform(members.begin(), members.end(), inserter(s, s.begin()), - bind(&Map::value_type::first, _1)); - return s; -} - -ostream& operator<<(ostream& o, const ClusterMap::Map& m) { - ostream_iterator<MemberId> oi(o); - transform(m.begin(), m.end(), oi, bind(&ClusterMap::Map::value_type::first, _1)); - return o; -} - -ostream& operator<<(ostream& o, const ClusterMap& m) { - for (ClusterMap::Set::const_iterator i = m.alive.begin(); i != m.alive.end(); ++i) { - o << *i; - if (m.isMember(*i)) o << "(member)"; - else if (m.isJoiner(*i)) o << "(joiner)"; - else o << "(unknown)"; - o << " "; - } - o << "frameSeq=" << m.getFrameSeq(); - return o; -} - -bool ClusterMap::updateRequest(const MemberId& id, const string& url) { - try { - if (isAlive(id)) { - joiners[id] = Url(url); - return true; - } - } catch (const Url::Invalid&) { - QPID_LOG(error, "Invalid URL in cluster update request: " << url); - } - return false; -} - -bool ClusterMap::ready(const MemberId& id, const Url& url) { - return isAlive(id) && members.insert(Map::value_type(id,url)).second; -} - -bool ClusterMap::configChange(const Set& update) { - bool memberChange = false; - Set removed; - set_difference(alive.begin(), alive.end(), - update.begin(), update.end(), - inserter(removed, removed.begin())); - alive = update; - for (Set::const_iterator i = removed.begin(); i != removed.end(); ++i) { - memberChange = memberChange || members.erase(*i); - joiners.erase(*i); - } - return memberChange; -} - -optional<Url> ClusterMap::updateOffer(const MemberId& from, const MemberId& to) { - Map::iterator i = joiners.find(to); - if (isAlive(from) && i != joiners.end()) { - Url url= i->second; - joiners.erase(i); // No longer a potential updatee. - return url; - } - return optional<Url>(); -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h deleted file mode 100644 index cfa4ad924a..0000000000 --- a/cpp/src/qpid/cluster/ClusterMap.h +++ /dev/null @@ -1,106 +0,0 @@ -#ifndef QPID_CLUSTER_CLUSTERMAP_H -#define QPID_CLUSTER_CLUSTERMAP_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "MemberSet.h" -#include "qpid/Url.h" -#include "qpid/framing/ClusterConnectionMembershipBody.h" -#include "qpid/framing/SequenceNumber.h" - -#include <boost/function.hpp> -#include <boost/optional.hpp> - -#include <vector> -#include <deque> -#include <map> -#include <iosfwd> - -namespace qpid { -namespace cluster { - -/** - * Map of established cluster members and joiners waiting for an update, - * along with other cluster state that must be updated. - */ -class ClusterMap { - public: - typedef std::map<MemberId, Url> Map; - typedef std::set<MemberId> Set; - - ClusterMap(); - ClusterMap(const Map& map); - ClusterMap(const framing::FieldTable& joiners, const framing::FieldTable& members, - framing::SequenceNumber frameSeq); - - /** Update from config change. - *@return true if member set changed. - */ - bool configChange(const Set& members); - - bool isJoiner(const MemberId& id) const { return joiners.find(id) != joiners.end(); } - bool isMember(const MemberId& id) const { return members.find(id) != members.end(); } - bool isAlive(const MemberId& id) const { return alive.find(id) != alive.end(); } - - Url getJoinerUrl(const MemberId& id) { return getUrl(joiners, id); } - Url getMemberUrl(const MemberId& id) { return getUrl(members, id); } - - /** First joiner in the cluster in ID order, target for offers */ - MemberId firstJoiner() const; - - /** Convert map contents to a cluster control body. */ - void toMethodBody(framing::ClusterConnectionMembershipBody&) const; - - size_t aliveCount() const { return alive.size(); } - size_t memberCount() const { return members.size(); } - std::vector<std::string> memberIds() const; - std::vector<Url> memberUrls() const; - Set getAlive() const; - Set getMembers() const; - - bool updateRequest(const MemberId& id, const std::string& url); - /** Return non-empty Url if accepted */ - boost::optional<Url> updateOffer(const MemberId& from, const MemberId& to); - - /**@return true If this is a new member */ - bool ready(const MemberId& id, const Url&); - - framing::SequenceNumber getFrameSeq() const { return frameSeq; } - framing::SequenceNumber incrementFrameSeq() { return ++frameSeq; } - - /** Clear out all knowledge of joiners & members, just keep alive set */ - void clearStatus() { joiners.clear(); members.clear(); } - - private: - Url getUrl(const Map& map, const MemberId& id); - - Map joiners, members; - Set alive; - framing::SequenceNumber frameSeq; - - friend std::ostream& operator<<(std::ostream&, const Map&); - friend std::ostream& operator<<(std::ostream&, const ClusterMap&); -}; - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_CLUSTERMAP_H*/ diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp deleted file mode 100644 index 2962daaa07..0000000000 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ /dev/null @@ -1,123 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "config.h" -#include "qpid/cluster/Connection.h" -#include "qpid/cluster/ConnectionCodec.h" -#include "qpid/cluster/ClusterSettings.h" - -#include "qpid/cluster/SecureConnectionFactory.h" - -#include "qpid/cluster/Cluster.h" -#include "qpid/cluster/ConnectionCodec.h" -#include "qpid/cluster/UpdateClient.h" - -#include "qpid/broker/Broker.h" -#include "qpid/Plugin.h" -#include "qpid/Options.h" -#include "qpid/sys/AtomicValue.h" -#include "qpid/log/Statement.h" - -#include "qpid/management/ManagementAgent.h" -#include "qpid/broker/Exchange.h" -#include "qpid/broker/Message.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/SessionState.h" -#include "qpid/client/ConnectionSettings.h" - -#include <boost/shared_ptr.hpp> -#include <boost/utility/in_place_factory.hpp> -#include <boost/scoped_ptr.hpp> - -namespace qpid { -namespace cluster { - -using namespace std; -using broker::Broker; -using management::ManagementAgent; - - -/** Note separating options from settings to work around boost version differences. - * Old boost takes a reference to options objects, but new boost makes a copy. - * New boost allows a shared_ptr but that's not compatible with old boost. - */ -struct ClusterOptions : public Options { - ClusterSettings& settings; - - ClusterOptions(ClusterSettings& v) : Options("Cluster Options"), settings(v) { - addOptions() - ("cluster-name", optValue(settings.name, "NAME"), "Name of cluster to join") - ("cluster-url", optValue(settings.url,"URL"), - "Set URL of this individual broker, to be advertized to clients.\n" - "Defaults to a URL listing all the local IP addresses\n") - ("cluster-username", optValue(settings.username, ""), "Username for connections between brokers") - ("cluster-password", optValue(settings.password, ""), "Password for connections between brokers") - ("cluster-mechanism", optValue(settings.mechanism, ""), "Authentication mechanism for connections between brokers") -#if HAVE_LIBCMAN_H - ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.") -#endif - ("cluster-size", optValue(settings.size, "N"), "Wait for N cluster members before allowing clients to connect.") - ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.") - ; - } -}; - -typedef boost::shared_ptr<sys::ConnectionCodec::Factory> CodecFactoryPtr; - -struct ClusterPlugin : public Plugin { - - ClusterSettings settings; - ClusterOptions options; - Cluster* cluster; - boost::scoped_ptr<ConnectionCodec::Factory> factory; - - ClusterPlugin() : options(settings), cluster(0) {} - - // Cluster needs to be initialized after the store - int initOrder() const { return Plugin::DEFAULT_INIT_ORDER+500; } - - Options* getOptions() { return &options; } - - void earlyInitialize(Plugin::Target& target) { - if (settings.name.empty()) return; // Only if --cluster-name option was specified. - Broker* broker = dynamic_cast<Broker*>(&target); - if (!broker) return; - cluster = new Cluster(settings, *broker); - CodecFactoryPtr simpleFactory(new broker::ConnectionFactory(*broker)); - CodecFactoryPtr clusterFactory(new ConnectionCodec::Factory(simpleFactory, *cluster)); - CodecFactoryPtr secureFactory(new SecureConnectionFactory(clusterFactory)); - broker->setConnectionFactory(secureFactory); - } - - void disallowManagementMethods(ManagementAgent* agent) { - if (!agent) return; - agent->disallowV1Methods(); - } - - void initialize(Plugin::Target& target) { - Broker* broker = dynamic_cast<Broker*>(&target); - if (broker && cluster) { - disallowManagementMethods(broker->getManagementAgent()); - cluster->initialize(); - } - } -}; - -static ClusterPlugin instance; // Static initialization. - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ClusterSettings.h b/cpp/src/qpid/cluster/ClusterSettings.h deleted file mode 100644 index 8e708aa139..0000000000 --- a/cpp/src/qpid/cluster/ClusterSettings.h +++ /dev/null @@ -1,50 +0,0 @@ -#ifndef QPID_CLUSTER_CLUSTERSETTINGS_H -#define QPID_CLUSTER_CLUSTERSETTINGS_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <qpid/Url.h> -#include <string> - -namespace qpid { -namespace cluster { - -struct ClusterSettings { - std::string name; - std::string url; - bool quorum; - size_t readMax; - std::string username, password, mechanism; - size_t size; - - ClusterSettings() : quorum(false), readMax(10), size(1) - {} - - Url getUrl(uint16_t port) const { - if (url.empty()) return Url::getIpAddressesUrl(port); - return Url(url); - } -}; - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_CLUSTERSETTINGS_H*/ diff --git a/cpp/src/qpid/cluster/ClusterTimer.cpp b/cpp/src/qpid/cluster/ClusterTimer.cpp deleted file mode 100644 index f6e1c7a849..0000000000 --- a/cpp/src/qpid/cluster/ClusterTimer.cpp +++ /dev/null @@ -1,138 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License "); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Cluster.h" -#include "ClusterTimer.h" -#include "qpid/log/Statement.h" -#include "qpid/framing/ClusterTimerWakeupBody.h" -#include "qpid/framing/ClusterTimerDropBody.h" - -namespace qpid { -namespace cluster { - -using boost::intrusive_ptr; -using std::max; -using sys::Timer; -using sys::TimerTask; - -// -// Note on use of Broker::getTimer() rather than getClusterTime in broker code. -// The following uses of getTimer() are cluster safe: -// -// LinkRegistry: maintenance visits in timer can call Bridge::create/cancel -// but these don't modify any management state. -// -// broker::Connection: -// - Heartbeats use ClusterOrderOutput to ensure consistency -// - timeout: aborts connection in timer, cluster does an orderly connection close. -// -// SessionState: scheduledCredit - uses ClusterOrderProxy -// Broker::queueCleaner: cluster implements ExpiryPolicy for consistent expiry. -// -// Broker::dtxManager: dtx disabled with cluster. -// -// requestIOProcessing: called in doOutput. -// - - -ClusterTimer::ClusterTimer(Cluster& c) : cluster(c) { - // Allow more generous overrun threshold with cluster as we - // have to do a CPG round trip before executing the task. - overran = 10*sys::TIME_MSEC; - late = 100*sys::TIME_MSEC; -} - -ClusterTimer::~ClusterTimer() {} - -// Initialization or deliver thread. -void ClusterTimer::add(intrusive_ptr<TimerTask> task) -{ - QPID_LOG(trace, "Adding cluster timer task " << task->getName()); - Map::iterator i = map.find(task->getName()); - if (i != map.end()) - throw Exception(QPID_MSG("Task already exists with name " << task->getName())); - map[task->getName()] = task; - // Only the elder actually activates the task with the Timer base class. - if (cluster.isElder()) { - QPID_LOG(trace, "Elder activating cluster timer task " << task->getName()); - Timer::add(task); - } -} - -// Timer thread -void ClusterTimer::fire(intrusive_ptr<TimerTask> t) { - // Elder mcasts wakeup on fire, task is not fired until deliverWakeup - if (cluster.isElder()) { - QPID_LOG(trace, "Sending cluster timer wakeup " << t->getName()); - cluster.getMulticast().mcastControl( - framing::ClusterTimerWakeupBody(framing::ProtocolVersion(), t->getName()), - cluster.getId()); - } - else - QPID_LOG(trace, "Cluster timer task fired, but not elder " << t->getName()); -} - -// Timer thread -void ClusterTimer::drop(intrusive_ptr<TimerTask> t) { - // Elder mcasts drop, task is droped in deliverDrop - if (cluster.isElder()) { - QPID_LOG(trace, "Sending cluster timer drop " << t->getName()); - cluster.getMulticast().mcastControl( - framing::ClusterTimerDropBody(framing::ProtocolVersion(), t->getName()), - cluster.getId()); - } - else - QPID_LOG(trace, "Cluster timer task dropped, but not on elder " << t->getName()); -} - -// Deliver thread -void ClusterTimer::deliverWakeup(const std::string& name) { - QPID_LOG(trace, "Cluster timer wakeup delivered for " << name); - Map::iterator i = map.find(name); - if (i == map.end()) - throw Exception(QPID_MSG("Cluster timer wakeup non-existent task " << name)); - else { - intrusive_ptr<TimerTask> t = i->second; - map.erase(i); - Timer::fire(t); - } -} - -// Deliver thread -void ClusterTimer::deliverDrop(const std::string& name) { - QPID_LOG(trace, "Cluster timer drop delivered for " << name); - Map::iterator i = map.find(name); - if (i == map.end()) - throw Exception(QPID_MSG("Cluster timer drop non-existent task " << name)); - else { - intrusive_ptr<TimerTask> t = i->second; - map.erase(i); - } -} - -// Deliver thread -void ClusterTimer::becomeElder() { - for (Map::iterator i = map.begin(); i != map.end(); ++i) { - Timer::add(i->second); - } -} - -}} diff --git a/cpp/src/qpid/cluster/ClusterTimer.h b/cpp/src/qpid/cluster/ClusterTimer.h deleted file mode 100644 index 69f6c622e4..0000000000 --- a/cpp/src/qpid/cluster/ClusterTimer.h +++ /dev/null @@ -1,64 +0,0 @@ -#ifndef QPID_CLUSTER_CLUSTERTIMER_H -#define QPID_CLUSTER_CLUSTERTIMER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/Timer.h" -#include <map> - -namespace qpid { -namespace cluster { - -class Cluster; - -/** - * Timer implementation that executes tasks consistently in the - * deliver thread across a cluster. Task is not executed when timer - * fires, instead the elder multicasts a wakeup. The task is executed - * when the wakeup is delivered. - */ -class ClusterTimer : public sys::Timer { - public: - ClusterTimer(Cluster&); - ~ClusterTimer(); - - void add(boost::intrusive_ptr<sys::TimerTask> task); - - void deliverWakeup(const std::string& name); - void deliverDrop(const std::string& name); - void becomeElder(); - - protected: - void fire(boost::intrusive_ptr<sys::TimerTask> task); - void drop(boost::intrusive_ptr<sys::TimerTask> task); - - private: - typedef std::map<std::string, boost::intrusive_ptr<sys::TimerTask> > Map; - Cluster& cluster; - Map map; -}; - - -}} - - -#endif /*!QPID_CLUSTER_CLUSTERTIMER_H*/ diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp deleted file mode 100644 index b9895290e9..0000000000 --- a/cpp/src/qpid/cluster/Connection.cpp +++ /dev/null @@ -1,728 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/amqp_0_10/Codecs.h" -#include "Connection.h" -#include "UpdateClient.h" -#include "Cluster.h" -#include "UpdateReceiver.h" -#include "qpid/assert.h" -#include "qpid/broker/SessionState.h" -#include "qpid/broker/SemanticState.h" -#include "qpid/broker/TxBuffer.h" -#include "qpid/broker/TxPublish.h" -#include "qpid/broker/TxAccept.h" -#include "qpid/broker/RecoveredEnqueue.h" -#include "qpid/broker/RecoveredDequeue.h" -#include "qpid/broker/Exchange.h" -#include "qpid/broker/Fairshare.h" -#include "qpid/broker/Link.h" -#include "qpid/broker/Bridge.h" -#include "qpid/broker/StatefulQueueObserver.h" -#include "qpid/broker/Queue.h" -#include "qpid/framing/enum.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/AllInvoker.h" -#include "qpid/framing/DeliveryProperties.h" -#include "qpid/framing/ClusterConnectionDeliverCloseBody.h" -#include "qpid/framing/ClusterConnectionAnnounceBody.h" -#include "qpid/framing/ConnectionCloseBody.h" -#include "qpid/framing/ConnectionCloseOkBody.h" -#include "qpid/log/Statement.h" -#include "qpid/sys/ClusterSafe.h" -#include "qpid/types/Variant.h" -#include "qpid/management/ManagementAgent.h" -#include <boost/current_function.hpp> - - -namespace qpid { -namespace cluster { - -using namespace framing; -using namespace framing::cluster; -using amqp_0_10::ListCodec; -using types::Variant; - -qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL); - -Connection::NullFrameHandler Connection::nullFrameHandler; - -struct NullFrameHandler : public framing::FrameHandler { - void handle(framing::AMQFrame&) {} -}; - - -namespace { -sys::AtomicValue<uint64_t> idCounter; -const std::string shadowPrefix("[shadow]"); -} - - -// Shadow connection -Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& mgmtId, - const ConnectionId& id, const qpid::sys::SecuritySettings& external) - : cluster(c), self(id), catchUp(false), announced(false), output(*this, out), - connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true), - expectProtocolHeader(false), - mcastFrameHandler(cluster.getMulticast(), self), - updateIn(c.getUpdateReceiver()), - secureConnection(0) -{} - -// Local connection -Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& mgmtId, MemberId member, - bool isCatchUp, bool isLink, const qpid::sys::SecuritySettings& external -) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), announced(false), output(*this, out), - connectionCtor(&output, cluster.getBroker(), - mgmtId, - external, - isLink, - isCatchUp ? ++catchUpId : 0, - isCatchUp), // isCatchUp => shadow - expectProtocolHeader(isLink), - mcastFrameHandler(cluster.getMulticast(), self), - updateIn(c.getUpdateReceiver()), - secureConnection(0) -{ - if (isLocalClient()) { - giveReadCredit(cluster.getSettings().readMax); // Flow control - // Delay adding the connection to the management map until announce() - connectionCtor.delayManagement = true; - } - else { - // Catch-up shadow connections initialized using nextShadow id. - assert(catchUp); - if (!updateIn.nextShadowMgmtId.empty()) - connectionCtor.mgmtId = updateIn.nextShadowMgmtId; - updateIn.nextShadowMgmtId.clear(); - } - init(); - QPID_LOG(debug, cluster << " local connection " << *this); -} - -void Connection::setSecureConnection(broker::SecureConnection* sc) { - secureConnection = sc; - if (connection.get()) connection->setSecureConnection(sc); -} - -void Connection::init() { - connection = connectionCtor.construct(); - if (isLocalClient()) { - if (secureConnection) connection->setSecureConnection(secureConnection); - // Actively send cluster-order frames from local node - connection->setClusterOrderOutput(mcastFrameHandler); - } - else { // Shadow or catch-up connection - // Passive, discard cluster-order frames - connection->setClusterOrderOutput(nullFrameHandler); - // Disable client throttling, done by active node. - connection->setClientThrottling(false); - } - if (!isCatchUp()) - connection->setErrorListener(this); -} - -// Called when we have consumed a read buffer to give credit to the -// connection layer to continue reading. -void Connection::giveReadCredit(int credit) { - if (cluster.getSettings().readMax && credit) - output.giveReadCredit(credit); -} - -void Connection::announce( - const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict, - const std::string& username, const std::string& initialFrames) -{ - QPID_ASSERT(mgmtId == connectionCtor.mgmtId); - QPID_ASSERT(ssf == connectionCtor.external.ssf); - QPID_ASSERT(authid == connectionCtor.external.authid); - QPID_ASSERT(nodict == connectionCtor.external.nodict); - // Local connections are already initialized but with management delayed. - if (isLocalClient()) { - connection->addManagementObject(); - } - else if (isShadow()) { - init(); - // Play initial frames into the connection. - Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size()); - AMQFrame frame; - while (frame.decode(buf)) - connection->received(frame); - connection->setUserId(username); - } - // Do managment actions now that the connection is replicated. - connection->raiseConnectEvent(); - QPID_LOG(debug, cluster << " replicated connection " << *this); -} - -Connection::~Connection() { - if (connection.get()) connection->setErrorListener(0); - // Don't trigger cluster-safe asserts in broker:: ~Connection as - // it may be called in an IO thread context during broker - // shutdown. - sys::ClusterSafeScope css; - connection.reset(); -} - -bool Connection::doOutput() { - return output.doOutput(); -} - -// Received from a directly connected client. -void Connection::received(framing::AMQFrame& f) { - if (!connection.get()) { - QPID_LOG(warning, cluster << " ignoring frame on closed connection " - << *this << ": " << f); - return; - } - QPID_LOG(trace, cluster << " RECV " << *this << ": " << f); - if (isLocal()) { // Local catch-up connection. - currentChannel = f.getChannel(); - if (!framing::invoke(*this, *f.getBody()).wasHandled()) - connection->received(f); - } - else { // Shadow or updated catch-up connection. - if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { - if (isShadow()) - cluster.addShadowConnection(this); - AMQFrame ok((ConnectionCloseOkBody())); - connection->getOutput().send(ok); - output.closeOutput(); - catchUp = false; - } - else - QPID_LOG(warning, cluster << " ignoring unexpected frame " << *this << ": " << f); - } -} - -bool Connection::checkUnsupported(const AMQBody& body) { - std::string message; - if (body.getMethod()) { - switch (body.getMethod()->amqpClassId()) { - case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break; - } - } - if (!message.empty()) - connection->close(connection::CLOSE_CODE_FRAMING_ERROR, message); - return !message.empty(); -} - -struct GiveReadCreditOnExit { - Connection& connection; - int credit; - GiveReadCreditOnExit(Connection& connection_, int credit_) : - connection(connection_), credit(credit_) {} - ~GiveReadCreditOnExit() { if (credit) connection.giveReadCredit(credit); } -}; - -void Connection::deliverDoOutput(uint32_t limit) { - output.deliverDoOutput(limit); -} - -// Called in delivery thread, in cluster order. -void Connection::deliveredFrame(const EventFrame& f) { - GiveReadCreditOnExit gc(*this, f.readCredit); - assert(!catchUp); - currentChannel = f.frame.getChannel(); - if (f.frame.getBody() // frame can be emtpy with just readCredit - && !framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. - && !checkUnsupported(*f.frame.getBody())) // Unsupported operation. - { - if (f.type == DATA) // incoming data frames to broker::Connection - connection->received(const_cast<AMQFrame&>(f.frame)); - else { // frame control, send frame via SessionState - broker::SessionState* ss = connection->getChannel(currentChannel).getSession(); - if (ss) ss->out(const_cast<AMQFrame&>(f.frame)); - } - } -} - -// A local connection is closed by the network layer. Called in the connection thread. -void Connection::closed() { - try { - if (isUpdated()) { - QPID_LOG(debug, cluster << " update connection closed " << *this); - close(); - cluster.updateInClosed(); - } - else if (catchUp && cluster.isExpectingUpdate()) { - QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this); - cluster.leave(); - } - else if (isLocal()) { - // This was a local replicated connection. Multicast a deliver - // closed and process any outstanding frames from the cluster - // until self-delivery of deliver-close. - output.closeOutput(); - if (announced) - cluster.getMulticast().mcastControl( - ClusterConnectionDeliverCloseBody(), self); - } - } - catch (const std::exception& e) { - QPID_LOG(error, cluster << " error closing connection " << *this << ": " << e.what()); - } -} - -// Self-delivery of close message, close the connection. -void Connection::deliverClose () { - close(); - cluster.erase(self); -} - -// Close the connection -void Connection::close() { - if (connection.get()) { - QPID_LOG(debug, cluster << " closed connection " << *this); - connection->closed(); - connection.reset(); - } -} - -// The connection has sent invalid data and should be aborted. -// All members will get the same abort since they all process the same data. -void Connection::abort() { - connection->abort(); - // Aborting the connection will result in a call to ::closed() - // and allow the connection to close in an orderly manner. -} - -// ConnectionCodec::decode receives read buffers from directly-connected clients. -size_t Connection::decode(const char* data, size_t size) { - GiveReadCreditOnExit grc(*this, 1); // Give a read credit by default. - const char* ptr = data; - const char* end = data + size; - if (catchUp) { // Handle catch-up locally. - if (!cluster.isExpectingUpdate()) { - QPID_LOG(error, "Rejecting unexpected catch-up connection."); - abort(); // Cluster is not expecting catch-up connections. - } - bool wasOpen = connection->isOpen(); - Buffer buf(const_cast<char*>(ptr), size); - ptr += size; - while (localDecoder.decode(buf)) - received(localDecoder.getFrame()); - if (!wasOpen && connection->isOpen()) { - // Connections marked as federation links are allowed to proxy - // messages with user-ID that doesn't match the connection's - // authenticated ID. This is important for updates. - connection->setFederationLink(isCatchUp()); - } - } - else { // Multicast local connections. - assert(isLocalClient()); - assert(connection.get()); - if (!checkProtocolHeader(ptr, size)) // Updates ptr - return 0; // Incomplete header - - if (!connection->isOpen()) - processInitialFrames(ptr, end-ptr); // Updates ptr - - if (connection->isOpen() && end - ptr > 0) { - // We're multi-casting, we will give read credit on delivery. - grc.credit = 0; - cluster.getMulticast().mcastBuffer(ptr, end - ptr, self); - ptr = end; - } - } - return ptr - data; -} - -// Decode the protocol header if needed. Updates data and size -// returns true if the header is complete or already read. -bool Connection::checkProtocolHeader(const char*& data, size_t size) { - if (expectProtocolHeader) { - // This is an outgoing link connection, we will receive a protocol - // header which needs to be decoded first - framing::ProtocolInitiation pi; - Buffer buf(const_cast<char*&>(data), size); - if (pi.decode(buf)) { - //TODO: check the version is correct - expectProtocolHeader = false; - data += pi.encodedSize(); - } else { - return false; - } - } - return true; -} - -void Connection::processInitialFrames(const char*& ptr, size_t size) { - // Process the initial negotiation locally and store it so - // it can be replayed on other brokers in announce() - Buffer buf(const_cast<char*>(ptr), size); - framing::AMQFrame frame; - while (!connection->isOpen() && frame.decode(buf)) - received(frame); - initialFrames.append(ptr, buf.getPosition()); - ptr += buf.getPosition(); - if (connection->isOpen()) { // initial negotiation complete - cluster.getMulticast().mcastControl( - ClusterConnectionAnnounceBody( - ProtocolVersion(), - connectionCtor.mgmtId, - connectionCtor.external.ssf, - connectionCtor.external.authid, - connectionCtor.external.nodict, - connection->getUserId(), - initialFrames), - getId()); - announced = true; - initialFrames.clear(); - } -} - -broker::SessionState& Connection::sessionState() { - return *connection->getChannel(currentChannel).getSession(); -} - -broker::SemanticState& Connection::semanticState() { - return sessionState().getSemanticState(); -} - -void Connection::shadowPrepare(const std::string& mgmtId) { - updateIn.nextShadowMgmtId = mgmtId; -} - -void Connection::shadowSetUser(const std::string& userId) { - connection->setUserId(userId); -} - -void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position) -{ - broker::SemanticState::ConsumerImpl& c = semanticState().find(name); - c.position = position; - c.setBlocked(blocked); - if (notifyEnabled) c.enableNotify(); else c.disableNotify(); - updateIn.consumerNumbering.add(c.shared_from_this()); -} - - -void Connection::sessionState( - const SequenceNumber& replayStart, - const SequenceNumber& sendCommandPoint, - const SequenceSet& sentIncomplete, - const SequenceNumber& expected, - const SequenceNumber& received, - const SequenceSet& unknownCompleted, - const SequenceSet& receivedIncomplete) -{ - sessionState().setState( - replayStart, - sendCommandPoint, - sentIncomplete, - expected, - received, - unknownCompleted, - receivedIncomplete); - QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); - // The output tasks will be added later in the update process. - connection->getOutputTasks().removeAll(); -} - -void Connection::outputTask(uint16_t channel, const std::string& name) { - broker::SessionState* session = connection->getChannel(channel).getSession(); - if (!session) - throw Exception(QPID_MSG(cluster << " channel not attached " << *this - << "[" << channel << "] ")); - OutputTask* task = &session->getSemanticState().find(name); - connection->getOutputTasks().addOutputTask(task); -} - -void Connection::shadowReady( - uint64_t memberId, uint64_t connectionId, const string& mgmtId, - const string& username, const string& fragment, uint32_t sendMax) -{ - QPID_ASSERT(mgmtId == getBrokerConnection()->getMgmtId()); - ConnectionId shadowId = ConnectionId(memberId, connectionId); - QPID_LOG(debug, cluster << " catch-up connection " << *this - << " becomes shadow " << shadowId); - self = shadowId; - connection->setUserId(username); - // OK to use decoder here because cluster is stalled for update. - cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size()); - connection->setErrorListener(this); - output.setSendMax(sendMax); -} - -void Connection::membership(const FieldTable& joiners, const FieldTable& members, - const framing::SequenceNumber& frameSeq) -{ - QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); - updateIn.consumerNumbering.clear(); - closeUpdated(); - cluster.updateInDone(ClusterMap(joiners, members, frameSeq)); -} - -void Connection::retractOffer() { - QPID_LOG(info, cluster << " incoming update retracted on connection " << *this); - closeUpdated(); - cluster.updateInRetracted(); -} - -void Connection::closeUpdated() { - self.second = 0; // Mark this as completed update connection. - if (connection.get()) - connection->close(connection::CLOSE_CODE_NORMAL, "OK"); -} - -bool Connection::isLocal() const { - return self.first == cluster.getId() && self.second; -} - -bool Connection::isShadow() const { - return self.first != cluster.getId(); -} - -bool Connection::isUpdated() const { - return self.first == cluster.getId() && self.second == 0; -} - - -boost::shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) { - boost::shared_ptr<broker::Queue> queue = cluster.getBroker().getQueues().find(qname); - if (!queue) throw Exception(QPID_MSG(cluster << " can't find queue " << qname)); - return queue; -} - -broker::QueuedMessage Connection::getUpdateMessage() { - boost::shared_ptr<broker::Queue> updateq = findQueue(UpdateClient::UPDATE); - assert(!updateq->isDurable()); - broker::QueuedMessage m = updateq->get(); - if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue")); - return m; -} - -void Connection::deliveryRecord(const string& qname, - const SequenceNumber& position, - const string& tag, - const SequenceNumber& id, - bool acquired, - bool accepted, - bool cancelled, - bool completed, - bool ended, - bool windowing, - bool enqueued, - uint32_t credit) -{ - broker::QueuedMessage m; - broker::Queue::shared_ptr queue = findQueue(qname); - if (!ended) { // Has a message - if (acquired) { // Message is on the update queue - m = getUpdateMessage(); - m.queue = queue.get(); - m.position = position; - if (enqueued) queue->updateEnqueued(m); //inform queue of the message - } else { // Message at original position in original queue - m = queue->find(position); - } - if (!m.payload) - throw Exception(QPID_MSG("deliveryRecord no update message")); - } - - broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit); - dr.setId(id); - if (cancelled) dr.cancel(dr.getTag()); - if (completed) dr.complete(); - if (ended) dr.setEnded(); // Exsitance of message - semanticState().record(dr); // Part of the session's unacked list. -} - -void Connection::queuePosition(const string& qname, const SequenceNumber& position) { - findQueue(qname)->setPosition(position); -} - -void Connection::queueFairshareState(const std::string& qname, const uint8_t priority, const uint8_t count) -{ - if (!qpid::broker::Fairshare::setState(findQueue(qname)->getMessages(), priority, count)) { - QPID_LOG(error, "Failed to set fair share state on queue " << qname << "; this will result in inconsistencies."); - } -} - - -namespace { - // find a StatefulQueueObserver that matches a given identifier - class ObserverFinder { - const std::string id; - boost::shared_ptr<broker::QueueObserver> target; - ObserverFinder(const ObserverFinder&) {} - public: - ObserverFinder(const std::string& _id) : id(_id) {} - broker::StatefulQueueObserver *getObserver() - { - if (target) - return dynamic_cast<broker::StatefulQueueObserver *>(target.get()); - return 0; - } - void operator() (boost::shared_ptr<broker::QueueObserver> o) - { - if (!target) { - broker::StatefulQueueObserver *p = dynamic_cast<broker::StatefulQueueObserver *>(o.get()); - if (p && p->getId() == id) { - target = o; - } - } - } - }; -} - - -void Connection::queueObserverState(const std::string& qname, const std::string& observerId, const FieldTable& state) -{ - boost::shared_ptr<broker::Queue> queue(findQueue(qname)); - ObserverFinder finder(observerId); // find this observer - queue->eachObserver<ObserverFinder &>(finder); - broker::StatefulQueueObserver *so = finder.getObserver(); - if (so) { - so->setState( state ); - QPID_LOG(debug, "updated queue observer " << observerId << "'s state on queue " << qname << "; ..."); - return; - } - QPID_LOG(error, "Failed to find observer " << observerId << " state on queue " << qname << "; this will result in inconsistencies."); -} - -void Connection::expiryId(uint64_t id) { - cluster.getExpiryPolicy().setId(id); -} - -std::ostream& operator<<(std::ostream& o, const Connection& c) { - const char* type="unknown"; - if (c.isLocal()) type = "local"; - else if (c.isShadow()) type = "shadow"; - else if (c.isUpdated()) type = "updated"; - const broker::Connection* bc = c.getBrokerConnection(); - if (bc) o << bc->getMgmtId(); - else o << "<disconnected>"; - return o << "(" << c.getId() << " " << type << (c.isCatchUp() ? ",catchup":"") << ")"; -} - -void Connection::txStart() { - txBuffer.reset(new broker::TxBuffer()); -} -void Connection::txAccept(const framing::SequenceSet& acked) { - txBuffer->enlist(boost::shared_ptr<broker::TxAccept>( - new broker::TxAccept(acked, semanticState().getUnacked()))); -} - -void Connection::txDequeue(const std::string& queue) { - txBuffer->enlist(boost::shared_ptr<broker::RecoveredDequeue>( - new broker::RecoveredDequeue(findQueue(queue), getUpdateMessage().payload))); -} - -void Connection::txEnqueue(const std::string& queue) { - txBuffer->enlist(boost::shared_ptr<broker::RecoveredEnqueue>( - new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload))); -} - -void Connection::txPublish(const framing::Array& queues, bool delivered) { - boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload)); - for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) - txPub->deliverTo(findQueue((*i)->get<std::string>())); - txPub->delivered = delivered; - txBuffer->enlist(txPub); -} - -void Connection::txEnd() { - semanticState().setTxBuffer(txBuffer); -} - -void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) { - semanticState().setAccumulatedAck(s); -} - -void Connection::exchange(const std::string& encoded) { - Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); - broker::Exchange::shared_ptr ex = broker::Exchange::decode(cluster.getBroker().getExchanges(), buf); - if(ex.get() && ex->isDurable() && !ex->getName().find("amq.") == 0 && !ex->getName().find("qpid.") == 0) { - cluster.getBroker().getStore().create(*(ex.get()), ex->getArgs()); - } - QPID_LOG(debug, cluster << " updated exchange " << ex->getName()); -} - -void Connection::sessionError(uint16_t , const std::string& msg) { - // Ignore errors before isOpen(), we're not multicasting yet. - if (connection->isOpen()) - cluster.flagError(*this, ERROR_TYPE_SESSION, msg); -} - -void Connection::connectionError(const std::string& msg) { - // Ignore errors before isOpen(), we're not multicasting yet. - if (connection->isOpen()) - cluster.flagError(*this, ERROR_TYPE_CONNECTION, msg); -} - -void Connection::addQueueListener(const std::string& q, uint32_t listener) { - if (listener >= updateIn.consumerNumbering.size()) - throw Exception(QPID_MSG("Invalid listener ID: " << listener)); - findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]); -} - -// -// This is the handler for incoming managementsetup messages. -// -void Connection::managementSetupState( - uint64_t objectNum, uint16_t bootSequence, const framing::Uuid& id, - const std::string& vendor, const std::string& product, const std::string& instance) -{ - QPID_LOG(debug, cluster << " updated management: object number=" - << objectNum << " boot sequence=" << bootSequence - << " broker-id=" << id - << " vendor=" << vendor - << " product=" << product - << " instance=" << instance); - management::ManagementAgent* agent = cluster.getBroker().getManagementAgent(); - if (!agent) - throw Exception(QPID_MSG("Management schema update but management not enabled.")); - agent->setNextObjectId(objectNum); - agent->setBootSequence(bootSequence); - agent->setUuid(id); - agent->setName(vendor, product, instance); -} - -void Connection::config(const std::string& encoded) { - Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); - string kind; - buf.getShortString (kind); - if (kind == "link") { - broker::Link::shared_ptr link = - broker::Link::decode(cluster.getBroker().getLinks(), buf); - QPID_LOG(debug, cluster << " updated link " - << link->getHost() << ":" << link->getPort()); - } - else if (kind == "bridge") { - broker::Bridge::shared_ptr bridge = - broker::Bridge::decode(cluster.getBroker().getLinks(), buf); - QPID_LOG(debug, cluster << " updated bridge " << bridge->getName()); - } - else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); -} - -void Connection::doCatchupIoCallbacks() { - // We need to process IO callbacks during the catch-up phase in - // order to service asynchronous completions for messages - // transferred during catch-up. - - if (catchUp) getBrokerConnection()->doIoCallbacks(); -} -}} // Namespace qpid::cluster - diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h deleted file mode 100644 index a0da9efbb8..0000000000 --- a/cpp/src/qpid/cluster/Connection.h +++ /dev/null @@ -1,276 +0,0 @@ -#ifndef QPID_CLUSTER_CONNECTION_H -#define QPID_CLUSTER_CONNECTION_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "types.h" -#include "OutputInterceptor.h" -#include "McastFrameHandler.h" -#include "UpdateReceiver.h" - -#include "qpid/RefCounted.h" -#include "qpid/broker/Connection.h" -#include "qpid/broker/SecureConnection.h" -#include "qpid/broker/SemanticState.h" -#include "qpid/amqp_0_10/Connection.h" -#include "qpid/sys/AtomicValue.h" -#include "qpid/sys/ConnectionInputHandler.h" -#include "qpid/sys/ConnectionOutputHandler.h" -#include "qpid/sys/SecuritySettings.h" -#include "qpid/framing/SequenceNumber.h" -#include "qpid/framing/FrameDecoder.h" - -#include <iosfwd> - -namespace qpid { - -namespace framing { class AMQFrame; } - -namespace broker { -class SemanticState; -struct QueuedMessage; -class TxBuffer; -class TxAccept; -} - -namespace cluster { -class Cluster; -class Event; -struct EventFrame; - -/** Intercept broker::Connection calls for shadow and local cluster connections. */ -class Connection : - public RefCounted, - public sys::ConnectionInputHandler, - public framing::AMQP_AllOperations::ClusterConnectionHandler, - private broker::Connection::ErrorListener - -{ - public: - - /** Local connection. */ - Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, MemberId, bool catchUp, bool isLink, - const qpid::sys::SecuritySettings& external); - /** Shadow connection. */ - Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id, - const qpid::sys::SecuritySettings& external); - ~Connection(); - - ConnectionId getId() const { return self; } - broker::Connection* getBrokerConnection() { return connection.get(); } - const broker::Connection* getBrokerConnection() const { return connection.get(); } - - /** Local connections may be clients or catch-up connections */ - bool isLocal() const; - - bool isLocalClient() const { return isLocal() && !isCatchUp(); } - - /** True for connections that are shadowing remote broker connections */ - bool isShadow() const; - - /** True if the connection is in "catch-up" mode: building initial broker state. */ - bool isCatchUp() const { return catchUp; } - - /** True if the connection is a completed shared update connection */ - bool isUpdated() const; - - Cluster& getCluster() { return cluster; } - - // ConnectionInputHandler methods - void received(framing::AMQFrame&); - void closed(); - bool doOutput(); - void idleOut() { if (connection.get()) connection->idleOut(); } - void idleIn() { if (connection.get()) connection->idleIn(); } - - // ConnectionCodec methods - called by IO layer with a read buffer. - size_t decode(const char* buffer, size_t size); - - // Called for data delivered from the cluster. - void deliveredFrame(const EventFrame&); - - void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position); - - // ==== Used in catch-up mode to build initial state. - // - // State update methods. - void shadowPrepare(const std::string&); - - void shadowSetUser(const std::string&); - - void sessionState(const framing::SequenceNumber& replayStart, - const framing::SequenceNumber& sendCommandPoint, - const framing::SequenceSet& sentIncomplete, - const framing::SequenceNumber& expected, - const framing::SequenceNumber& received, - const framing::SequenceSet& unknownCompleted, - const SequenceSet& receivedIncomplete); - - void outputTask(uint16_t channel, const std::string& name); - - void shadowReady(uint64_t memberId, - uint64_t connectionId, - const std::string& managementId, - const std::string& username, - const std::string& fragment, - uint32_t sendMax); - - void membership(const framing::FieldTable&, const framing::FieldTable&, - const framing::SequenceNumber& frameSeq); - - void retractOffer(); - - void deliveryRecord(const std::string& queue, - const framing::SequenceNumber& position, - const std::string& tag, - const framing::SequenceNumber& id, - bool acquired, - bool accepted, - bool cancelled, - bool completed, - bool ended, - bool windowing, - bool enqueued, - uint32_t credit); - - void queuePosition(const std::string&, const framing::SequenceNumber&); - void queueFairshareState(const std::string&, const uint8_t priority, const uint8_t count); - void queueObserverState(const std::string&, const std::string&, const framing::FieldTable&); - void expiryId(uint64_t); - - void txStart(); - void txAccept(const framing::SequenceSet&); - void txDequeue(const std::string&); - void txEnqueue(const std::string&); - void txPublish(const framing::Array&, bool); - void txEnd(); - void accumulatedAck(const framing::SequenceSet&); - - // Encoded exchange replication. - void exchange(const std::string& encoded); - - void giveReadCredit(int credit); - void announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, - bool nodict, const std::string& username, - const std::string& initFrames); - void close(); - void abort(); - void deliverClose(); - - OutputInterceptor& getOutput() { return output; } - - void addQueueListener(const std::string& queue, uint32_t listener); - void managementSetupState(uint64_t objectNum, - uint16_t bootSequence, - const framing::Uuid&, - const std::string& vendor, - const std::string& product, - const std::string& instance); - - void config(const std::string& encoded); - - void setSecureConnection ( broker::SecureConnection * sc ); - - void doCatchupIoCallbacks(); - - private: - struct NullFrameHandler : public framing::FrameHandler { - void handle(framing::AMQFrame&) {} - }; - - // Arguments to construct a broker::Connection - struct ConnectionCtor { - sys::ConnectionOutputHandler* out; - broker::Broker& broker; - std::string mgmtId; - qpid::sys::SecuritySettings external; - bool isLink; - uint64_t objectId; - bool shadow; - bool delayManagement; - - ConnectionCtor( - sys::ConnectionOutputHandler* out_, - broker::Broker& broker_, - const std::string& mgmtId_, - const qpid::sys::SecuritySettings& external_, - bool isLink_=false, - uint64_t objectId_=0, - bool shadow_=false, - bool delayManagement_=false - ) : out(out_), broker(broker_), mgmtId(mgmtId_), external(external_), - isLink(isLink_), objectId(objectId_), shadow(shadow_), - delayManagement(delayManagement_) - {} - - std::auto_ptr<broker::Connection> construct() { - return std::auto_ptr<broker::Connection>( - new broker::Connection( - out, broker, mgmtId, external, isLink, objectId, - shadow, delayManagement) - ); - } - }; - - static NullFrameHandler nullFrameHandler; - - // Error listener functions - void connectionError(const std::string&); - void sessionError(uint16_t channel, const std::string&); - - void init(); - bool checkUnsupported(const framing::AMQBody& body); - void deliverDoOutput(uint32_t limit); - - bool checkProtocolHeader(const char*& data, size_t size); - void processInitialFrames(const char*& data, size_t size); - boost::shared_ptr<broker::Queue> findQueue(const std::string& qname); - broker::SessionState& sessionState(); - broker::SemanticState& semanticState(); - broker::QueuedMessage getUpdateMessage(); - void closeUpdated(); - - Cluster& cluster; - ConnectionId self; - bool catchUp; - bool announced; - OutputInterceptor output; - framing::FrameDecoder localDecoder; - ConnectionCtor connectionCtor; - std::auto_ptr<broker::Connection> connection; - framing::SequenceNumber deliverSeq; - framing::ChannelId currentChannel; - boost::shared_ptr<broker::TxBuffer> txBuffer; - bool expectProtocolHeader; - McastFrameHandler mcastFrameHandler; - UpdateReceiver& updateIn; - qpid::broker::SecureConnection* secureConnection; - std::string initialFrames; - - static qpid::sys::AtomicValue<uint64_t> catchUpId; - - friend std::ostream& operator<<(std::ostream&, const Connection&); -}; - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_CONNECTION_H*/ diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp deleted file mode 100644 index d0ba8abfb3..0000000000 --- a/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ /dev/null @@ -1,92 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/cluster/ConnectionCodec.h" -#include "qpid/cluster/Connection.h" -#include "qpid/cluster/Cluster.h" -#include "qpid/cluster/ProxyInputHandler.h" -#include "qpid/broker/Connection.h" -#include "qpid/framing/ConnectionCloseBody.h" -#include "qpid/framing/ConnectionCloseOkBody.h" -#include "qpid/log/Statement.h" -#include "qpid/memory.h" -#include <stdexcept> -#include <boost/utility/in_place_factory.hpp> - -namespace qpid { -namespace cluster { - -using namespace framing; - -sys::ConnectionCodec* -ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, - const std::string& id, - const qpid::sys::SecuritySettings& external) -{ - broker::Broker& broker = cluster.getBroker(); - if (broker.getConnectionCounter().allowConnection()) - { - QPID_LOG(error, "Client max connection count limit exceeded: " - << broker.getOptions().maxConnections << " connection refused"); - return 0; - } - if (v == ProtocolVersion(0, 10)) - return new ConnectionCodec(v, out, id, cluster, false, false, external); - else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10)) // Catch-up connection - return new ConnectionCodec(v, out, id, cluster, true, false, external); - return 0; -} - -// Used for outgoing Link connections -sys::ConnectionCodec* -ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId, - const qpid::sys::SecuritySettings& external) { - return new ConnectionCodec(ProtocolVersion(0,10), out, logId, cluster, false, true, external); -} - -ConnectionCodec::ConnectionCodec( - const ProtocolVersion& v, sys::OutputControl& out, - const std::string& logId, Cluster& cluster, bool catchUp, bool isLink, const qpid::sys::SecuritySettings& external -) : codec(out, logId, isLink), - interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink, external)) -{ - cluster.addLocalConnection(interceptor); - std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor)); - codec.setInputHandler(ih); - codec.setVersion(v); -} - -ConnectionCodec::~ConnectionCodec() {} - -size_t ConnectionCodec::decode(const char* buffer, size_t size) { - return interceptor->decode(buffer, size); -} - -bool ConnectionCodec::isClosed() const { return codec.isClosed(); } - -size_t ConnectionCodec::encode(const char* buffer, size_t size) { return codec.encode(buffer, size); } - -bool ConnectionCodec::canEncode() { return codec.canEncode(); } - -void ConnectionCodec::closed() { codec.closed(); } - -ProtocolVersion ConnectionCodec::getVersion() const { return codec.getVersion(); } - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h deleted file mode 100644 index 17a08904d9..0000000000 --- a/cpp/src/qpid/cluster/ConnectionCodec.h +++ /dev/null @@ -1,82 +0,0 @@ -#ifndef QPID_CLUSTER_CONNCTIONCODEC_H -#define QPID_CLUSTER_CONNCTIONCODEC_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/amqp_0_10/Connection.h" -#include "qpid/cluster/Connection.h" -#include <boost/shared_ptr.hpp> -#include <boost/intrusive_ptr.hpp> - -namespace qpid { - -namespace broker { -class Connection; -} - -namespace cluster { -class Cluster; - -/** - * Encapsulates the standard amqp_0_10::ConnectionCodec and sets up - * a cluster::Connection for the connection. - * - * The ConnectionCodec is deleted by the network layer when the - * connection closes. The cluster::Connection needs to be kept - * around until all cluster business on the connection is complete. - * - */ -class ConnectionCodec : public sys::ConnectionCodec { - public: - struct Factory : public sys::ConnectionCodec::Factory { - boost::shared_ptr<sys::ConnectionCodec::Factory> next; - Cluster& cluster; - Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c) - : next(f), cluster(c) {} - sys::ConnectionCodec* create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id, - const qpid::sys::SecuritySettings& external); - sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id, - const qpid::sys::SecuritySettings& external); - }; - - ConnectionCodec(const framing::ProtocolVersion&, sys::OutputControl& out, - const std::string& logId, Cluster& c, bool catchUp, bool isLink, - const qpid::sys::SecuritySettings& external); - ~ConnectionCodec(); - - // ConnectionCodec functions. - size_t decode(const char* buffer, size_t size); - size_t encode(const char* buffer, size_t size); - bool canEncode(); - void closed(); - bool isClosed() const; - framing::ProtocolVersion getVersion() const; - void setSecureConnection(broker::SecureConnection* sc) { interceptor->setSecureConnection(sc); } - - private: - amqp_0_10::Connection codec; - boost::intrusive_ptr<cluster::Connection> interceptor; -}; - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_CONNCTIONCODEC_H*/ diff --git a/cpp/src/qpid/cluster/Cpg.cpp b/cpp/src/qpid/cluster/Cpg.cpp deleted file mode 100644 index 0856bcd824..0000000000 --- a/cpp/src/qpid/cluster/Cpg.cpp +++ /dev/null @@ -1,280 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/cluster/Cpg.h" -#include "qpid/sys/Mutex.h" -#include "qpid/sys/Time.h" -#include "qpid/sys/posix/PrivatePosix.h" -#include "qpid/log/Statement.h" - -#include <vector> -#include <limits> -#include <iterator> -#include <sstream> - -#include <unistd.h> - -// This is a macro instead of a function because we don't want to -// evaluate the MSG argument unless there is an error. -#define CPG_CHECK(RESULT, MSG) \ - if ((RESULT) != CPG_OK) throw Exception(errorStr((RESULT), (MSG))) - -namespace qpid { -namespace cluster { - -using namespace std; - - - -Cpg* Cpg::cpgFromHandle(cpg_handle_t handle) { - void* cpg=0; - CPG_CHECK(cpg_context_get(handle, &cpg), "Cannot get CPG instance."); - if (!cpg) throw Exception("Cannot get CPG instance."); - return reinterpret_cast<Cpg*>(cpg); -} - -// Applies the same retry-logic to all cpg calls that need it. -void Cpg::callCpg ( CpgOp & c ) { - cpg_error_t result; - unsigned int snooze = 10; - for ( unsigned int nth_try = 0; nth_try < cpgRetries; ++ nth_try ) { - if ( CPG_OK == (result = c.op(handle, & group))) { - break; - } - else if ( result == CPG_ERR_TRY_AGAIN ) { - QPID_LOG(info, "Retrying " << c.opName ); - sys::usleep ( snooze ); - snooze *= 10; - snooze = (snooze <= maxCpgRetrySleep) ? snooze : maxCpgRetrySleep; - } - else break; // Don't retry unless CPG tells us to. - } - - if ( result != CPG_OK ) - CPG_CHECK(result, c.msg(group)); -} - -// Global callback functions. -void Cpg::globalDeliver ( - cpg_handle_t handle, - const struct cpg_name *group, - uint32_t nodeid, - uint32_t pid, - void* msg, - size_t msg_len) -{ - cpgFromHandle(handle)->handler.deliver(handle, group, nodeid, pid, msg, msg_len); -} - -void Cpg::globalConfigChange( - cpg_handle_t handle, - const struct cpg_name *group, - const struct cpg_address *members, size_t nMembers, - const struct cpg_address *left, size_t nLeft, - const struct cpg_address *joined, size_t nJoined -) -{ - cpgFromHandle(handle)->handler.configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined); -} - -void Cpg::globalDeliver ( - cpg_handle_t handle, - struct cpg_name *group, - uint32_t nodeid, - uint32_t pid, - void* msg, - int msg_len) -{ - cpgFromHandle(handle)->handler.deliver(handle, group, nodeid, pid, msg, msg_len); -} - -void Cpg::globalConfigChange( - cpg_handle_t handle, - struct cpg_name *group, - struct cpg_address *members, int nMembers, - struct cpg_address *left, int nLeft, - struct cpg_address *joined, int nJoined -) -{ - cpgFromHandle(handle)->handler.configChange(handle, group, members, nMembers, left, nLeft, joined, nJoined); -} - -int Cpg::getFd() { - int fd; - CPG_CHECK(cpg_fd_get(handle, &fd), "Cannot get CPG file descriptor"); - return fd; -} - -Cpg::Cpg(Handler& h) : IOHandle(new sys::IOHandlePrivate), handler(h), isShutdown(false) { - cpg_callbacks_t callbacks; - ::memset(&callbacks, 0, sizeof(callbacks)); - callbacks.cpg_deliver_fn = &globalDeliver; - callbacks.cpg_confchg_fn = &globalConfigChange; - - QPID_LOG(notice, "Initializing CPG"); - cpg_error_t err = cpg_initialize(&handle, &callbacks); - int retries = 6; // FIXME aconway 2009-08-06: make this configurable. - while (err == CPG_ERR_TRY_AGAIN && --retries) { - QPID_LOG(notice, "Re-trying CPG initialization."); - sys::sleep(5); - err = cpg_initialize(&handle, &callbacks); - } - CPG_CHECK(err, "Failed to initialize CPG."); - CPG_CHECK(cpg_context_set(handle, this), "Cannot set CPG context"); - // Note: CPG is currently unix-specific. If CPG is ported to - // windows then this needs to be refactored into - // qpid::sys::<platform> - IOHandle::impl->fd = getFd(); -} - -Cpg::~Cpg() { - try { - shutdown(); - } catch (const std::exception& e) { - QPID_LOG(error, "Error during CPG shutdown: " << e.what()); - } -} - -void Cpg::join(const std::string& name) { - group = name; - callCpg ( cpgJoinOp ); -} - -void Cpg::leave() { - callCpg ( cpgLeaveOp ); -} - - - - -bool Cpg::mcast(const iovec* iov, int iovLen) { - // Check for flow control - cpg_flow_control_state_t flowState; - CPG_CHECK(cpg_flow_control_state_get(handle, &flowState), "Cannot get CPG flow control status."); - if (flowState == CPG_FLOW_CONTROL_ENABLED) - return false; - - cpg_error_t result; - do { - result = cpg_mcast_joined(handle, CPG_TYPE_AGREED, const_cast<iovec*>(iov), iovLen); - if (result != CPG_ERR_TRY_AGAIN) CPG_CHECK(result, cantMcastMsg(group)); - } while(result == CPG_ERR_TRY_AGAIN); - return true; -} - -void Cpg::shutdown() { - if (!isShutdown) { - QPID_LOG(debug,"Shutting down CPG"); - isShutdown=true; - - callCpg ( cpgFinalizeOp ); - } -} - -void Cpg::dispatchOne() { - CPG_CHECK(cpg_dispatch(handle,CPG_DISPATCH_ONE), "Error in CPG dispatch"); -} - -void Cpg::dispatchAll() { - CPG_CHECK(cpg_dispatch(handle,CPG_DISPATCH_ALL), "Error in CPG dispatch"); -} - -void Cpg::dispatchBlocking() { - CPG_CHECK(cpg_dispatch(handle,CPG_DISPATCH_BLOCKING), "Error in CPG dispatch"); -} - -string Cpg::errorStr(cpg_error_t err, const std::string& msg) { - std::ostringstream os; - os << msg << ": "; - switch (err) { - case CPG_OK: os << "ok"; break; - case CPG_ERR_LIBRARY: os << "library"; break; - case CPG_ERR_TIMEOUT: os << "timeout"; break; - case CPG_ERR_TRY_AGAIN: os << "try again"; break; - case CPG_ERR_INVALID_PARAM: os << "invalid param"; break; - case CPG_ERR_NO_MEMORY: os << "no memory"; break; - case CPG_ERR_BAD_HANDLE: os << "bad handle"; break; - case CPG_ERR_ACCESS: os << "access denied. You may need to set your group ID to 'ais'"; break; - case CPG_ERR_NOT_EXIST: os << "not exist"; break; - case CPG_ERR_EXIST: os << "exist"; break; - case CPG_ERR_NOT_SUPPORTED: os << "not supported"; break; - case CPG_ERR_SECURITY: os << "security"; break; - case CPG_ERR_TOO_MANY_GROUPS: os << "too many groups"; break; - default: os << ": unknown cpg error " << err; - }; - os << " (" << err << ")"; - return os.str(); -} - -std::string Cpg::cantJoinMsg(const Name& group) { - return "Cannot join CPG group "+group.str(); -} - -std::string Cpg::cantFinalizeMsg(const Name& group) { - return "Cannot finalize CPG group "+group.str(); -} - -std::string Cpg::cantLeaveMsg(const Name& group) { - return "Cannot leave CPG group "+group.str(); -} - -std::string Cpg::cantMcastMsg(const Name& group) { - return "Cannot mcast to CPG group "+group.str(); -} - -MemberId Cpg::self() const { - unsigned int nodeid; - CPG_CHECK(cpg_local_get(handle, &nodeid), "Cannot get local CPG identity"); - return MemberId(nodeid, getpid()); -} - -namespace { int byte(uint32_t value, int i) { return (value >> (i*8)) & 0xff; } } - -ostream& operator<<(ostream& out, const MemberId& id) { - if (id.first) { - out << byte(id.first, 0) << "." - << byte(id.first, 1) << "." - << byte(id.first, 2) << "." - << byte(id.first, 3) - << ":"; - } - return out << id.second; -} - -ostream& operator<<(ostream& o, const ConnectionId& c) { - return o << c.first << "-" << c.second; -} - -std::string MemberId::str() const { - char s[8]; - uint32_t x; - x = htonl(first); - ::memcpy(s, &x, 4); - x = htonl(second); - ::memcpy(s+4, &x, 4); - return std::string(s,8); -} - -MemberId::MemberId(const std::string& s) { - uint32_t x; - memcpy(&x, &s[0], 4); - first = ntohl(x); - memcpy(&x, &s[4], 4); - second = ntohl(x); -} -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cpg.h b/cpp/src/qpid/cluster/Cpg.h deleted file mode 100644 index 6b81c602bd..0000000000 --- a/cpp/src/qpid/cluster/Cpg.h +++ /dev/null @@ -1,236 +0,0 @@ -#ifndef CPG_H -#define CPG_H - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/Exception.h" -#include "qpid/cluster/types.h" -#include "qpid/sys/IOHandle.h" -#include "qpid/sys/Mutex.h" - -#include <boost/scoped_ptr.hpp> - -#include <cassert> -#include <string.h> - -namespace qpid { -namespace cluster { - -/** - * Lightweight C++ interface to cpg.h operations. - * - * Manages a single CPG handle, initialized in ctor, finialzed in destructor. - * On error all functions throw Cpg::Exception. - * - */ - -class Cpg : public sys::IOHandle { - public: - struct Exception : public ::qpid::Exception { - Exception(const std::string& msg) : ::qpid::Exception(msg) {} - }; - - struct Name : public cpg_name { - Name() { length = 0; } - Name(const char* s) { copy(s, strlen(s)); } - Name(const char* s, size_t n) { copy(s,n); } - Name(const std::string& s) { copy(s.data(), s.size()); } - void copy(const char* s, size_t n) { - assert(n < CPG_MAX_NAME_LENGTH); - memcpy(value, s, n); - length=n; - } - - std::string str() const { return std::string(value, length); } - }; - - static std::string str(const cpg_name& n) { - return std::string(n.value, n.length); - } - - struct Handler { - virtual ~Handler() {}; - virtual void deliver( - cpg_handle_t /*handle*/, - const struct cpg_name *group, - uint32_t /*nodeid*/, - uint32_t /*pid*/, - void* /*msg*/, - int /*msg_len*/) = 0; - - virtual void configChange( - cpg_handle_t /*handle*/, - const struct cpg_name */*group*/, - const struct cpg_address */*members*/, int /*nMembers*/, - const struct cpg_address */*left*/, int /*nLeft*/, - const struct cpg_address */*joined*/, int /*nJoined*/ - ) = 0; - }; - - /** Open a CPG handle. - *@param handler for CPG events. - */ - Cpg(Handler&); - - /** Destructor calls shutdown if not already calledx. */ - ~Cpg(); - - /** Disconnect from CPG */ - void shutdown(); - - void dispatchOne(); - void dispatchAll(); - void dispatchBlocking(); - - void join(const std::string& group); - void leave(); - - /** Multicast to the group. NB: must not be called concurrently. - * - *@return true if the message was multi-cast, false if - * it was not sent due to flow control. - */ - bool mcast(const iovec* iov, int iovLen); - - cpg_handle_t getHandle() const { return handle; } - - MemberId self() const; - - int getFd(); - - private: - - // Maximum number of retries for cog functions that can tell - // us to "try again later". - static const unsigned int cpgRetries = 5; - - // Don't let sleep-time between cpg retries to go above 0.1 second. - static const unsigned int maxCpgRetrySleep = 100000; - - - // Base class for the Cpg operations that need retry capability. - struct CpgOp { - std::string opName; - - CpgOp ( std::string opName ) - : opName(opName) { } - - virtual cpg_error_t op ( cpg_handle_t handle, struct cpg_name * ) = 0; - virtual std::string msg(const Name&) = 0; - virtual ~CpgOp ( ) { } - }; - - - struct CpgJoinOp : public CpgOp { - CpgJoinOp ( ) - : CpgOp ( std::string("cpg_join") ) { } - - cpg_error_t op(cpg_handle_t handle, struct cpg_name * group) { - return cpg_join ( handle, group ); - } - - std::string msg(const Name& name) { return cantJoinMsg(name); } - }; - - struct CpgLeaveOp : public CpgOp { - CpgLeaveOp ( ) - : CpgOp ( std::string("cpg_leave") ) { } - - cpg_error_t op(cpg_handle_t handle, struct cpg_name * group) { - return cpg_leave ( handle, group ); - } - - std::string msg(const Name& name) { return cantLeaveMsg(name); } - }; - - struct CpgFinalizeOp : public CpgOp { - CpgFinalizeOp ( ) - : CpgOp ( std::string("cpg_finalize") ) { } - - cpg_error_t op(cpg_handle_t handle, struct cpg_name *) { - return cpg_finalize ( handle ); - } - - std::string msg(const Name& name) { return cantFinalizeMsg(name); } - }; - - // This fn standardizes retry policy across all Cpg ops that need it. - void callCpg ( CpgOp & ); - - CpgJoinOp cpgJoinOp; - CpgLeaveOp cpgLeaveOp; - CpgFinalizeOp cpgFinalizeOp; - - static std::string errorStr(cpg_error_t err, const std::string& msg); - static std::string cantJoinMsg(const Name&); - static std::string cantLeaveMsg(const Name&); - static std::string cantMcastMsg(const Name&); - static std::string cantFinalizeMsg(const Name&); - - static Cpg* cpgFromHandle(cpg_handle_t); - - // New versions for corosync 1.0 and higher - static void globalDeliver( - cpg_handle_t handle, - const struct cpg_name *group, - uint32_t nodeid, - uint32_t pid, - void* msg, - size_t msg_len); - - static void globalConfigChange( - cpg_handle_t handle, - const struct cpg_name *group, - const struct cpg_address *members, size_t nMembers, - const struct cpg_address *left, size_t nLeft, - const struct cpg_address *joined, size_t nJoined - ); - - // Old versions for openais - static void globalDeliver( - cpg_handle_t handle, - struct cpg_name *group, - uint32_t nodeid, - uint32_t pid, - void* msg, - int msg_len); - - static void globalConfigChange( - cpg_handle_t handle, - struct cpg_name *group, - struct cpg_address *members, int nMembers, - struct cpg_address *left, int nLeft, - struct cpg_address *joined, int nJoined - ); - - cpg_handle_t handle; - Handler& handler; - bool isShutdown; - Name group; - sys::Mutex dispatchLock; -}; - -inline bool operator==(const cpg_name& a, const cpg_name& b) { - return a.length==b.length && strncmp(a.value, b.value, a.length) == 0; -} -inline bool operator!=(const cpg_name& a, const cpg_name& b) { return !(a == b); } - -}} // namespace qpid::cluster - -#endif /*!CPG_H*/ diff --git a/cpp/src/qpid/cluster/Decoder.cpp b/cpp/src/qpid/cluster/Decoder.cpp deleted file mode 100644 index 23ba372d78..0000000000 --- a/cpp/src/qpid/cluster/Decoder.cpp +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/cluster/Decoder.h" -#include "qpid/cluster/EventFrame.h" -#include "qpid/framing/ClusterConnectionDeliverCloseBody.h" -#include "qpid/framing/Buffer.h" -#include "qpid/framing/AMQFrame.h" - - -namespace qpid { -namespace cluster { - -void Decoder::decode(const EventHeader& eh, const char* data) { - sys::Mutex::ScopedLock l(lock); - assert(eh.getType() == DATA); // Only handle connection data events. - const char* cp = static_cast<const char*>(data); - framing::Buffer buf(const_cast<char*>(cp), eh.getSize()); - framing::FrameDecoder& decoder = map[eh.getConnectionId()]; - if (decoder.decode(buf)) { // Decoded a frame - framing::AMQFrame frame(decoder.getFrame()); - while (decoder.decode(buf)) { - callback(EventFrame(eh, frame)); - frame = decoder.getFrame(); - } - // Set read-credit on the last frame ending in this event. - // Credit will be given when this frame is processed. - callback(EventFrame(eh, frame, 1)); - } - else { - // We must give 1 unit read credit per event. - // This event does not complete any frames so - // send an empty frame with the read credit. - callback(EventFrame(eh, framing::AMQFrame(), 1)); - } -} - -void Decoder::erase(const ConnectionId& c) { - sys::Mutex::ScopedLock l(lock); - map.erase(c); -} - -framing::FrameDecoder& Decoder::get(const ConnectionId& c) { - sys::Mutex::ScopedLock l(lock); - return map[c]; -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Decoder.h b/cpp/src/qpid/cluster/Decoder.h deleted file mode 100644 index 3b5ada4a81..0000000000 --- a/cpp/src/qpid/cluster/Decoder.h +++ /dev/null @@ -1,59 +0,0 @@ -#ifndef QPID_CLUSTER_DECODER_H -#define QPID_CLUSTER_DECODER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/cluster/types.h" -#include "qpid/framing/FrameDecoder.h" -#include "qpid/sys/Mutex.h" -#include <boost/function.hpp> -#include <map> - -namespace qpid { -namespace cluster { - -struct EventFrame; -class EventHeader; - -/** - * A map of decoders for connections. - */ -class Decoder -{ - public: - typedef boost::function<void(const EventFrame&)> FrameHandler; - - Decoder(FrameHandler fh) : callback(fh) {} - void decode(const EventHeader& eh, const char* data); - void erase(const ConnectionId&); - framing::FrameDecoder& get(const ConnectionId& c); - - private: - typedef std::map<ConnectionId, framing::FrameDecoder> Map; - sys::Mutex lock; - Map map; - void process(const EventFrame&); - FrameHandler callback; -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_DECODER_H*/ diff --git a/cpp/src/qpid/cluster/Dispatchable.h b/cpp/src/qpid/cluster/Dispatchable.h deleted file mode 100644 index e7f0df4218..0000000000 --- a/cpp/src/qpid/cluster/Dispatchable.h +++ /dev/null @@ -1,52 +0,0 @@ -#ifndef QPID_CLUSTER_DISPATCHABLE_H -#define QPID_CLUSTER_DISPATCHABLE_H - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -namespace qpid { -namespace cluster { - -/** - * Interface for classes that have some "events" that need dispatching - * in a thread. - */ -class Dispatchable -{ - public: - virtual ~Dispatchable() {} - - /** Dispatch one event in current thread. */ - virtual void dispatchOne() = 0; - /** Dispatch all available events, don't block. */ - virtual void dispatchAll() = 0; - /** Blocking loop to dispatch cluster events */ - virtual void dispatchBlocking() = 0; - - /** Wait for at least one event, then dispatch all available events. - * Don't block. Useful for tests. - */ - virtual void dispatchSome() { dispatchOne(); dispatchAll(); } - -}; - -}} // namespace qpid::cluster - - - -#endif /*!QPID_CLUSTER_DISPATCHABLE_H*/ diff --git a/cpp/src/qpid/cluster/ErrorCheck.cpp b/cpp/src/qpid/cluster/ErrorCheck.cpp deleted file mode 100644 index be671c0f48..0000000000 --- a/cpp/src/qpid/cluster/ErrorCheck.cpp +++ /dev/null @@ -1,155 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/cluster/ErrorCheck.h" -#include "qpid/cluster/EventFrame.h" -#include "qpid/cluster/ClusterMap.h" -#include "qpid/cluster/Cluster.h" -#include "qpid/framing/ClusterErrorCheckBody.h" -#include "qpid/framing/ClusterConfigChangeBody.h" -#include "qpid/log/Statement.h" - -#include <algorithm> - -namespace qpid { -namespace cluster { - -using namespace std; -using namespace framing; -using namespace framing::cluster; - -ErrorCheck::ErrorCheck(Cluster& c) - : cluster(c), mcast(c.getMulticast()), frameSeq(0), type(ERROR_TYPE_NONE), connection(0) -{} - -void ErrorCheck::error( - Connection& c, ErrorType t, framing::SequenceNumber seq, const MemberSet& ms, - const std::string& msg) -{ - // Detected a local error, inform cluster and set error state. - assert(t != ERROR_TYPE_NONE); // Must be an error. - assert(type == ERROR_TYPE_NONE); // Can't be called when already in an error state. - type = t; - unresolved = ms; - frameSeq = seq; - connection = &c; - message = msg; - QPID_LOG(debug, cluster<< (type == ERROR_TYPE_SESSION ? " channel" : " connection") - << " error " << frameSeq << " on " << c - << " must be resolved with: " << unresolved - << ": " << message); - mcast.mcastControl( - ClusterErrorCheckBody(ProtocolVersion(), type, frameSeq), cluster.getId()); - // If there are already frames queued up by a previous error, review - // them with respect to this new error. - for (FrameQueue::iterator i = frames.begin(); i != frames.end(); i = review(i)) - ; -} - -void ErrorCheck::delivered(const EventFrame& e) { - frames.push_back(e); - review(frames.end()-1); -} - -// Review a frame in the queue with respect to the current error. -ErrorCheck::FrameQueue::iterator ErrorCheck::review(const FrameQueue::iterator& i) { - FrameQueue::iterator next = i+1; - if(!isUnresolved() || !i->frame.getBody() || !i->frame.getMethod()) - return next; // Only interested in control frames while unresolved. - const AMQMethodBody* method = i->frame.getMethod(); - if (method->isA<const ClusterErrorCheckBody>()) { - const ClusterErrorCheckBody* errorCheck = - static_cast<const ClusterErrorCheckBody*>(method); - - if (errorCheck->getFrameSeq() == frameSeq) { // Addresses current error - next = frames.erase(i); // Drop matching error check controls - if (errorCheck->getType() < type) { // my error is worse than his - QPID_LOG(critical, cluster - << " local error " << frameSeq << " did not occur on member " - << i->getMemberId() - << ": " << message); - throw Exception( - QPID_MSG("local error did not occur on all cluster members " << ": " << message)); - } - else { // his error is worse/same as mine. - QPID_LOG(debug, cluster << " error " << frameSeq - << " resolved with " << i->getMemberId()); - unresolved.erase(i->getMemberId()); - checkResolved(); - } - } - else if (errorCheck->getFrameSeq() < frameSeq && errorCheck->getType() != NONE - && i->connectionId.getMember() != cluster.getId()) - { - // This error occured before the current error so we - // have processed past it. - next = frames.erase(i); // Drop the error check control - respondNone(i->connectionId.getMember(), errorCheck->getType(), - errorCheck->getFrameSeq()); - } - // if errorCheck->getFrameSeq() > frameSeq then leave it in the queue. - } - else if (method->isA<const ClusterConfigChangeBody>()) { - const ClusterConfigChangeBody* configChange = - static_cast<const ClusterConfigChangeBody*>(method); - if (configChange) { - MemberSet members(decodeMemberSet(configChange->getMembers())); - QPID_LOG(debug, cluster << " apply config change to error " - << frameSeq << ": " << members); - MemberSet intersect; - set_intersection(members.begin(), members.end(), - unresolved.begin(), unresolved.end(), - inserter(intersect, intersect.begin())); - unresolved.swap(intersect); - checkResolved(); - } - } - return next; -} - -void ErrorCheck::checkResolved() { - if (unresolved.empty()) { // No more potentially conflicted members, we're clear. - type = ERROR_TYPE_NONE; - QPID_LOG(debug, cluster << " error " << frameSeq << " resolved."); - } - else - QPID_LOG(debug, cluster << " error " << frameSeq - << " must be resolved with " << unresolved); -} - -EventFrame ErrorCheck::getNext() { - assert(canProcess()); - EventFrame e(frames.front()); - frames.pop_front(); - return e; -} - -void ErrorCheck::respondNone(const MemberId& from, uint8_t type, framing::SequenceNumber frameSeq) { - // Don't respond to non-errors or to my own errors. - if (type == ERROR_TYPE_NONE || from == cluster.getId()) - return; - QPID_LOG(debug, cluster << " error " << frameSeq << " did not occur locally."); - mcast.mcastControl( - ClusterErrorCheckBody(ProtocolVersion(), ERROR_TYPE_NONE, frameSeq), - cluster.getId() - ); -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ErrorCheck.h b/cpp/src/qpid/cluster/ErrorCheck.h deleted file mode 100644 index a417b2ec25..0000000000 --- a/cpp/src/qpid/cluster/ErrorCheck.h +++ /dev/null @@ -1,90 +0,0 @@ -#ifndef QPID_CLUSTER_ERRORCHECK_H -#define QPID_CLUSTER_ERRORCHECK_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/cluster/MemberSet.h" -#include "qpid/cluster/Multicaster.h" -#include "qpid/framing/enum.h" -#include "qpid/framing/SequenceNumber.h" -#include <boost/function.hpp> -#include <deque> -#include <set> - -namespace qpid { -namespace cluster { - -struct EventFrame; -class Cluster; -class Multicaster; -class Connection; - -/** - * Error checking logic. - * - * When an error occurs queue up frames until we can determine if all - * nodes experienced the error. If not, we shut down. - */ -class ErrorCheck -{ - public: - typedef framing::cluster::ErrorType ErrorType; - typedef framing::SequenceNumber SequenceNumber; - - ErrorCheck(Cluster&); - - /** A local error has occured */ - void error(Connection&, ErrorType, SequenceNumber frameSeq, const MemberSet&, - const std::string& msg); - - /** Called when a frame is delivered */ - void delivered(const EventFrame&); - - /**@pre canProcess **/ - EventFrame getNext(); - - bool canProcess() const { return type == NONE && !frames.empty(); } - - bool isUnresolved() const { return type != NONE; } - - /** Respond to an error check saying we had no error. */ - void respondNone(const MemberId&, uint8_t type, SequenceNumber frameSeq); - - private: - static const ErrorType NONE = framing::cluster::ERROR_TYPE_NONE; - typedef std::deque<EventFrame> FrameQueue; - FrameQueue::iterator review(const FrameQueue::iterator&); - void checkResolved(); - - Cluster& cluster; - Multicaster& mcast; - FrameQueue frames; - MemberSet unresolved; - SequenceNumber frameSeq; - ErrorType type; - Connection* connection; - std::string message; -}; - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_ERRORCHECK_H*/ diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp deleted file mode 100644 index da2bc89d8c..0000000000 --- a/cpp/src/qpid/cluster/Event.cpp +++ /dev/null @@ -1,134 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/cluster/types.h" -#include "qpid/cluster/Event.h" -#include "qpid/cluster/Cpg.h" -#include "qpid/framing/Buffer.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/RefCountedBuffer.h" -#include "qpid/assert.h" -#include <ostream> -#include <iterator> -#include <algorithm> - -namespace qpid { -namespace cluster { - -using framing::Buffer; -using framing::AMQFrame; - -const size_t EventHeader::HEADER_SIZE = - sizeof(uint8_t) + // type - sizeof(uint64_t) + // connection pointer only, CPG provides member ID. - sizeof(uint32_t) // payload size - ; - -EventHeader::EventHeader(EventType t, const ConnectionId& c, size_t s) - : type(t), connectionId(c), size(s) {} - - -Event::Event() {} - -Event::Event(EventType t, const ConnectionId& c, size_t s) - : EventHeader(t,c,s), store(RefCountedBuffer::create(s+HEADER_SIZE)) -{} - -void EventHeader::decode(const MemberId& m, framing::Buffer& buf) { - QPID_ASSERT(buf.available() >= HEADER_SIZE); - type = (EventType)buf.getOctet(); - QPID_ASSERT(type == DATA || type == CONTROL); - connectionId = ConnectionId(m, buf.getLongLong()); - size = buf.getLong(); -} - -Event Event::decodeCopy(const MemberId& m, framing::Buffer& buf) { - Event e; - e.decode(m, buf); // Header - QPID_ASSERT(buf.available() >= e.size); - e.store = RefCountedBuffer::create(e.size + HEADER_SIZE); - memcpy(e.getData(), buf.getPointer() + buf.getPosition(), e.size); - return e; -} - -Event Event::control(const framing::AMQFrame& f, const ConnectionId& cid) { - Event e(CONTROL, cid, f.encodedSize()); - Buffer buf(e); - f.encode(buf); - return e; -} - -Event Event::control(const framing::AMQBody& body, const ConnectionId& cid) { - return control(framing::AMQFrame(body), cid); -} - -iovec Event::toIovec() const { - encodeHeader(); - iovec iov = { const_cast<char*>(getStore()), getStoreSize() }; - return iov; -} - -void EventHeader::encode(Buffer& b) const { - b.putOctet(type); - b.putLongLong(connectionId.getNumber()); - b.putLong(size); -} - -// Encode my header in my buffer. -void Event::encodeHeader () const { - Buffer b(const_cast<char*>(getStore()), HEADER_SIZE); - encode(b); - assert(b.getPosition() == HEADER_SIZE); -} - -Event::operator Buffer() const { - return Buffer(const_cast<char*>(getData()), getSize()); -} - -const AMQFrame& Event::getFrame() const { - assert(type == CONTROL); - if (!frame.getBody()) { - Buffer buf(*this); - QPID_ASSERT(frame.decode(buf)); - } - return frame; -} - -static const char* EVENT_TYPE_NAMES[] = { "data", "control" }; - -std::ostream& operator<< (std::ostream& o, EventType t) { - return o << EVENT_TYPE_NAMES[t]; -} - -std::ostream& operator<< (std::ostream& o, const EventHeader& e) { - return o << "Event[" << e.getConnectionId() << " " << e.getType() - << " " << e.getSize() << " bytes]"; -} - -std::ostream& operator<< (std::ostream& o, const Event& e) { - o << "Event[" << e.getConnectionId() << " "; - if (e.getType() == CONTROL) - o << e.getFrame(); - else - o << " data " << e.getSize() << " bytes"; - return o << "]"; -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h deleted file mode 100644 index 13283edff7..0000000000 --- a/cpp/src/qpid/cluster/Event.h +++ /dev/null @@ -1,116 +0,0 @@ -#ifndef QPID_CLUSTER_EVENT_H -#define QPID_CLUSTER_EVENT_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/cluster/types.h" -#include "qpid/BufferRef.h" -#include "qpid/framing/AMQFrame.h" -#include <sys/uio.h> // For iovec -#include <iosfwd> - -#include "qpid/cluster/types.h" - -namespace qpid { - -namespace framing { -class AMQBody; -class AMQFrame; -class Buffer; -} - -namespace cluster { - -/** Header data for a multicast event */ -class EventHeader { - public: - EventHeader(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t size=0); - void decode(const MemberId& m, framing::Buffer&); - void encode(framing::Buffer&) const; - - EventType getType() const { return type; } - ConnectionId getConnectionId() const { return connectionId; } - MemberId getMemberId() const { return connectionId.getMember(); } - - /** Size of payload data, excluding header. */ - size_t getSize() const { return size; } - /** Size of header + payload. */ - size_t getStoreSize() const { return size + HEADER_SIZE; } - - bool isCluster() const { return connectionId.getNumber() == 0; } - bool isConnection() const { return connectionId.getNumber() != 0; } - bool isControl() const { return type == CONTROL; } - - protected: - static const size_t HEADER_SIZE; - - EventType type; - ConnectionId connectionId; - size_t size; -}; - -/** - * Events are sent to/received from the cluster. - * Refcounted so they can be stored on queues. - */ -class Event : public EventHeader { - public: - Event(); - /** Create an event with a buffer that can hold size bytes plus an event header. */ - Event(EventType t, const ConnectionId& c, size_t); - - /** Create an event copied from delivered data. */ - static Event decodeCopy(const MemberId& m, framing::Buffer&); - - /** Create a control event. */ - static Event control(const framing::AMQBody&, const ConnectionId&); - - /** Create a control event. */ - static Event control(const framing::AMQFrame&, const ConnectionId&); - - // Data excluding header. - char* getData() { return store.begin() + HEADER_SIZE; } - const char* getData() const { return store.begin() + HEADER_SIZE; } - - // Store including header - char* getStore() { return store.begin(); } - const char* getStore() const { return store.begin(); } - - const framing::AMQFrame& getFrame() const; - - operator framing::Buffer() const; - - iovec toIovec() const; - - private: - void encodeHeader() const; - - BufferRef store; - mutable framing::AMQFrame frame; -}; - -std::ostream& operator << (std::ostream&, const Event&); -std::ostream& operator << (std::ostream&, const EventHeader&); - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_EVENT_H*/ diff --git a/cpp/src/qpid/cluster/EventFrame.cpp b/cpp/src/qpid/cluster/EventFrame.cpp deleted file mode 100644 index 5fbe1fe57c..0000000000 --- a/cpp/src/qpid/cluster/EventFrame.cpp +++ /dev/null @@ -1,41 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/cluster/EventFrame.h" -#include "qpid/cluster/Connection.h" - -namespace qpid { -namespace cluster { - -EventFrame::EventFrame() {} - -EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc) - : connectionId(e.getConnectionId()), frame(f), readCredit(rc), type(e.getType()) -{} - -std::ostream& operator<<(std::ostream& o, const EventFrame& e) { - if (e.frame.getBody()) o << e.frame; - else o << "null-frame"; - o << " " << e.type << " " << e.connectionId; - if (e.readCredit) o << " read-credit=" << e.readCredit; - return o; -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h deleted file mode 100644 index 6b702a9bf8..0000000000 --- a/cpp/src/qpid/cluster/EventFrame.h +++ /dev/null @@ -1,60 +0,0 @@ -#ifndef QPID_CLUSTER_EVENTFRAME_H -#define QPID_CLUSTER_EVENTFRAME_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/cluster/types.h" -#include "qpid/cluster/Event.h" -#include "qpid/framing/AMQFrame.h" -#include <boost/intrusive_ptr.hpp> -#include <iosfwd> - -namespace qpid { -namespace cluster { - -/** - * A frame decoded from an Event. - */ -struct EventFrame -{ - public: - EventFrame(); - - EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc=0); - - bool isCluster() const { return connectionId.getNumber() == 0; } - bool isConnection() const { return connectionId.getNumber() != 0; } - bool isLastInEvent() const { return readCredit; } - MemberId getMemberId() const { return connectionId.getMember(); } - - - ConnectionId connectionId; - framing::AMQFrame frame; - int readCredit; ///< last frame in an event, give credit when processed. - EventType type; -}; - -std::ostream& operator<<(std::ostream& o, const EventFrame& e); - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_EVENTFRAME_H*/ diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/cpp/src/qpid/cluster/ExpiryPolicy.cpp deleted file mode 100644 index d9a7b0122a..0000000000 --- a/cpp/src/qpid/cluster/ExpiryPolicy.cpp +++ /dev/null @@ -1,126 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/broker/Message.h" -#include "qpid/cluster/ExpiryPolicy.h" -#include "qpid/cluster/Multicaster.h" -#include "qpid/framing/ClusterMessageExpiredBody.h" -#include "qpid/sys/Time.h" -#include "qpid/sys/Timer.h" -#include "qpid/log/Statement.h" - -namespace qpid { -namespace cluster { - -ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, sys::Timer& t) - : expiryId(1), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {} - -struct ExpiryTask : public sys::TimerTask { - ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when) - : TimerTask(when,"ExpiryPolicy"), expiryPolicy(policy), expiryId(id) {} - void fire() { expiryPolicy->sendExpire(expiryId); } - boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; - const uint64_t expiryId; -}; - -// Called while receiving an update -void ExpiryPolicy::setId(uint64_t id) { - sys::Mutex::ScopedLock l(lock); - expiryId = id; -} - -// Called while giving an update -uint64_t ExpiryPolicy::getId() const { - sys::Mutex::ScopedLock l(lock); - return expiryId; -} - -// Called in enqueuing connection thread -void ExpiryPolicy::willExpire(broker::Message& m) { - uint64_t id; - { - // When messages are fanned out to multiple queues, update sends - // them as independenty messages so we can have multiple messages - // with the same expiry ID. - // - sys::Mutex::ScopedLock l(lock); - id = expiryId++; - if (!id) { // This is an update of an already-expired message. - m.setExpiryPolicy(expiredPolicy); - } - else { - assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end()); - // If this is an update, the id may already exist - unexpiredById.insert(IdMessageMap::value_type(id, &m)); - unexpiredByMessage[&m] = id; - } - } - timer.add(new ExpiryTask(this, id, m.getExpiration())); -} - -// Called in dequeueing connection thread -void ExpiryPolicy::forget(broker::Message& m) { - sys::Mutex::ScopedLock l(lock); - MessageIdMap::iterator i = unexpiredByMessage.find(&m); - assert(i != unexpiredByMessage.end()); - unexpiredById.erase(i->second); - unexpiredByMessage.erase(i); -} - -// Called in dequeueing connection or cleanup thread. -bool ExpiryPolicy::hasExpired(broker::Message& m) { - sys::Mutex::ScopedLock l(lock); - return unexpiredByMessage.find(&m) == unexpiredByMessage.end(); -} - -// Called in timer thread -void ExpiryPolicy::sendExpire(uint64_t id) { - { - sys::Mutex::ScopedLock l(lock); - // Don't multicast an expiry notice if message is already forgotten. - if (unexpiredById.find(id) == unexpiredById.end()) return; - } - mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId); -} - -// Called in CPG deliver thread. -void ExpiryPolicy::deliverExpire(uint64_t id) { - sys::Mutex::ScopedLock l(lock); - std::pair<IdMessageMap::iterator, IdMessageMap::iterator> expired = unexpiredById.equal_range(id); - IdMessageMap::iterator i = expired.first; - while (i != expired.second) { - i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true; - unexpiredByMessage.erase(i->second); - unexpiredById.erase(i++); - } -} - -// Called in update thread on the updater. -boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) { - sys::Mutex::ScopedLock l(lock); - MessageIdMap::iterator i = unexpiredByMessage.find(&m); - return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second; -} - -bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; } -void ExpiryPolicy::Expired::willExpire(broker::Message&) { } - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.h b/cpp/src/qpid/cluster/ExpiryPolicy.h deleted file mode 100644 index 77a656aa68..0000000000 --- a/cpp/src/qpid/cluster/ExpiryPolicy.h +++ /dev/null @@ -1,93 +0,0 @@ -#ifndef QPID_CLUSTER_EXPIRYPOLICY_H -#define QPID_CLUSTER_EXPIRYPOLICY_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/cluster/types.h" -#include "qpid/broker/ExpiryPolicy.h" -#include "qpid/sys/Mutex.h" -#include <boost/function.hpp> -#include <boost/intrusive_ptr.hpp> -#include <boost/optional.hpp> -#include <map> - -namespace qpid { - -namespace broker { -class Message; -} - -namespace sys { -class Timer; -} - -namespace cluster { -class Multicaster; - -/** - * Cluster expiry policy - */ -class ExpiryPolicy : public broker::ExpiryPolicy -{ - public: - ExpiryPolicy(Multicaster&, const MemberId&, sys::Timer&); - - void willExpire(broker::Message&); - bool hasExpired(broker::Message&); - void forget(broker::Message&); - - // Send expiration notice to cluster. - void sendExpire(uint64_t); - - // Cluster delivers expiry notice. - void deliverExpire(uint64_t); - - void setId(uint64_t id); - uint64_t getId() const; - - boost::optional<uint64_t> getId(broker::Message&); - - private: - typedef std::map<broker::Message*, uint64_t> MessageIdMap; - // When messages are fanned out to multiple queues, update sends - // them as independenty messages so we can have multiple messages - // with the same expiry ID. - typedef std::multimap<uint64_t, broker::Message*> IdMessageMap; - - struct Expired : public broker::ExpiryPolicy { - bool hasExpired(broker::Message&); - void willExpire(broker::Message&); - }; - - mutable sys::Mutex lock; - MessageIdMap unexpiredByMessage; - IdMessageMap unexpiredById; - uint64_t expiryId; - boost::intrusive_ptr<Expired> expiredPolicy; - Multicaster& mcast; - MemberId memberId; - sys::Timer& timer; -}; - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_EXPIRYPOLICY_H*/ diff --git a/cpp/src/qpid/cluster/FailoverExchange.cpp b/cpp/src/qpid/cluster/FailoverExchange.cpp deleted file mode 100644 index 84232dac1b..0000000000 --- a/cpp/src/qpid/cluster/FailoverExchange.cpp +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/cluster/FailoverExchange.h" -#include "qpid/broker/Message.h" -#include "qpid/broker/DeliverableMessage.h" -#include "qpid/broker/Queue.h" -#include "qpid/framing/MessageProperties.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/AMQHeaderBody.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/log/Statement.h" -#include "qpid/framing/Array.h" -#include <boost/bind.hpp> -#include <algorithm> - -namespace qpid { -namespace cluster { -using namespace std; - -using namespace broker; -using namespace framing; - -const string FailoverExchange::typeName("amq.failover"); - -FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b) : Exchange(typeName, parent, b ) { - if (mgmtExchange != 0) - mgmtExchange->set_type(typeName); -} - -void FailoverExchange::setUrls(const vector<Url>& u) { - Lock l(lock); - urls = u; -} - -void FailoverExchange::updateUrls(const vector<Url>& u) { - Lock l(lock); - urls=u; - if (urls.empty()) return; - std::for_each(queues.begin(), queues.end(), - boost::bind(&FailoverExchange::sendUpdate, this, _1)); -} - -string FailoverExchange::getType() const { return typeName; } - -bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) { - Lock l(lock); - sendUpdate(queue); - return queues.insert(queue).second; -} - -bool FailoverExchange::unbind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) { - Lock l(lock); - return queues.erase(queue); -} - -bool FailoverExchange::isBound(Queue::shared_ptr queue, const string* const, const framing::FieldTable*) { - Lock l(lock); - return queues.find(queue) != queues.end(); -} - -void FailoverExchange::route(Deliverable&, const string& , const framing::FieldTable* ) { - QPID_LOG(warning, "Message received by exchange " << typeName << " ignoring"); -} - -void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) { - // Called with lock held. - if (urls.empty()) return; - framing::Array array(0x95); - for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i) - array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str()))); - const ProtocolVersion v; - boost::intrusive_ptr<Message> msg(new Message); - AMQFrame command(MessageTransferBody(v, typeName, 1, 0)); - command.setLastSegment(false); - msg->getFrames().append(command); - AMQHeaderBody header; - header.get<MessageProperties>(true)->setContentLength(0); - header.get<MessageProperties>(true)->getApplicationHeaders().setArray(typeName, array); - AMQFrame headerFrame(header); - headerFrame.setFirstSegment(false); - msg->getFrames().append(headerFrame); - DeliverableMessage(msg).deliverTo(queue); -} - - -}} // namespace cluster diff --git a/cpp/src/qpid/cluster/FailoverExchange.h b/cpp/src/qpid/cluster/FailoverExchange.h deleted file mode 100644 index 2e1edfc0ae..0000000000 --- a/cpp/src/qpid/cluster/FailoverExchange.h +++ /dev/null @@ -1,71 +0,0 @@ -#ifndef QPID_CLUSTER_FAILOVEREXCHANGE_H -#define QPID_CLUSTER_FAILOVEREXCHANGE_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/broker/Exchange.h" -#include "qpid/broker/DeliverableMessage.h" -#include "qpid/Url.h" - -#include <vector> -#include <set> - -namespace qpid { -namespace cluster { - -/** - * Failover exchange provides failover host list, as specified in AMQP 0-10. - */ -class FailoverExchange : public broker::Exchange -{ - public: - static const std::string typeName; - - FailoverExchange(management::Manageable* parent, broker::Broker* b); - - /** Set the URLs but don't send an update.*/ - void setUrls(const std::vector<Url>&); - /** Set the URLs and send an update.*/ - void updateUrls(const std::vector<Url>&); - - // Exchange overrides - std::string getType() const; - bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args); - bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args); - bool isBound(boost::shared_ptr<broker::Queue> queue, const std::string* const routingKey, const framing::FieldTable* const args); - void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); - - private: - void sendUpdate(const boost::shared_ptr<broker::Queue>&); - - typedef sys::Mutex::ScopedLock Lock; - typedef std::vector<Url> Urls; - typedef std::set<boost::shared_ptr<broker::Queue> > Queues; - - sys::Mutex lock; - Urls urls; - Queues queues; - -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_FAILOVEREXCHANGE_H*/ diff --git a/cpp/src/qpid/cluster/InitialStatusMap.cpp b/cpp/src/qpid/cluster/InitialStatusMap.cpp deleted file mode 100644 index c8ecc13f2c..0000000000 --- a/cpp/src/qpid/cluster/InitialStatusMap.cpp +++ /dev/null @@ -1,226 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "InitialStatusMap.h" -#include "StoreStatus.h" -#include "qpid/log/Statement.h" -#include <algorithm> -#include <vector> -#include <boost/bind.hpp> - -namespace qpid { -namespace cluster { - -using namespace std; -using namespace boost; -using namespace framing::cluster; -using namespace framing; - -InitialStatusMap::InitialStatusMap(const MemberId& self_, size_t size_) - : self(self_), completed(), resendNeeded(), size(size_) -{} - -void InitialStatusMap::configChange(const MemberSet& members) { - resendNeeded = false; - bool wasComplete = isComplete(); - if (firstConfig.empty()) firstConfig = members; - MemberSet::const_iterator i = members.begin(); - Map::iterator j = map.begin(); - while (i != members.end() || j != map.end()) { - if (i == members.end()) { // j not in members, member left - firstConfig.erase(j->first); - Map::iterator k = j++; - map.erase(k); - } - else if (j == map.end()) { // i not in map, member joined - resendNeeded = true; - map[*i] = optional<Status>(); - ++i; - } - else if (*i < j->first) { // i not in map, member joined - resendNeeded = true; - map[*i] = optional<Status>(); - ++i; - } - else if (*i > j->first) { // j not in members, member left - firstConfig.erase(j->first); - Map::iterator k = j++; - map.erase(k); - } - else { - i++; j++; - } - } - if (resendNeeded) { // Clear all status - for (Map::iterator i = map.begin(); i != map.end(); ++i) - i->second = optional<Status>(); - } - completed = isComplete() && !wasComplete; // Set completed on the transition. -} - -void InitialStatusMap::received(const MemberId& m, const Status& s){ - bool wasComplete = isComplete(); - map[m] = s; - completed = isComplete() && !wasComplete; // Set completed on the transition. -} - -bool InitialStatusMap::notInitialized(const Map::value_type& v) { - return !v.second; -} - -bool InitialStatusMap::isComplete() const { - return !map.empty() && find_if(map.begin(), map.end(), ¬Initialized) == map.end(); -} - -bool InitialStatusMap::transitionToComplete() { - return completed; -} - -bool InitialStatusMap::isResendNeeded() { - bool ret = resendNeeded; - resendNeeded = false; - return ret; -} - -bool InitialStatusMap::isActiveEntry(const Map::value_type& v) { - return v.second && v.second->getActive(); -} - -bool InitialStatusMap::hasStore(const Map::value_type& v) { - return v.second && - (v.second->getStoreState() == STORE_STATE_CLEAN_STORE || - v.second->getStoreState() == STORE_STATE_DIRTY_STORE); -} - -bool InitialStatusMap::isActive() { - assert(isComplete()); - return (find_if(map.begin(), map.end(), &isActiveEntry) != map.end()); -} - -bool InitialStatusMap::isUpdateNeeded() { - assert(isComplete()); - // We need an update if there are any active members. - if (isActive()) return true; - - // Otherwise it depends on store status, get my own status: - Map::iterator me = map.find(self); - assert(me != map.end()); - assert(me->second); - switch (me->second->getStoreState()) { - case STORE_STATE_NO_STORE: - case STORE_STATE_EMPTY_STORE: - // If anybody has a store then we need an update. - return find_if(map.begin(), map.end(), &hasStore) != map.end(); - case STORE_STATE_DIRTY_STORE: return true; - case STORE_STATE_CLEAN_STORE: return false; // Use our own store - } - return false; -} - -MemberSet InitialStatusMap::getElders() const { - assert(isComplete()); - MemberSet elders; - for (MemberSet::const_iterator i = firstConfig.begin(); i != firstConfig.end(); ++i) { - // *i is in my first config, so a potential elder. - if (*i == self) continue; // Not my own elder - Map::const_iterator j = map.find(*i); - assert(j != map.end()); - assert(j->second); - const Status& s = *j->second; - // If I'm not in i's first config then i is older than me. - // Otherwise we were born in the same configuration so use - // member ID to break the tie. - MemberSet iFirstConfig = decodeMemberSet(s.getFirstConfig()); - if (iFirstConfig.find(self) == iFirstConfig.end() || *i > self) - elders.insert(*i); - } - return elders; -} - -// Get cluster ID from an active member or the youngest newcomer. -Uuid InitialStatusMap::getClusterId() { - assert(isComplete()); - assert(!map.empty()); - Map::iterator i = find_if(map.begin(), map.end(), &isActiveEntry); - if (i != map.end()) - return i->second->getClusterId(); // An active member - else - return map.begin()->second->getClusterId(); // Youngest newcomer in node-id order -} - -void checkId(Uuid& expect, const Uuid& actual, const string& msg) { - if (!expect) expect = actual; - assert(expect); - if (expect != actual) - throw Exception(msg); -} - -void InitialStatusMap::checkConsistent() { - assert(isComplete()); - int clean = 0; - int dirty = 0; - int empty = 0; - int none = 0; - int active = 0; - Uuid clusterId; - Uuid shutdownId; - - bool initialCluster = !isActive(); - for (Map::iterator i = map.begin(); i != map.end(); ++i) { - assert(i->second); - if (i->second->getActive()) ++active; - switch (i->second->getStoreState()) { - case STORE_STATE_NO_STORE: ++none; break; - case STORE_STATE_EMPTY_STORE: ++empty; break; - case STORE_STATE_DIRTY_STORE: - ++dirty; - checkId(clusterId, i->second->getClusterId(), - "Cluster-ID mismatch. Stores belong to different clusters."); - break; - case STORE_STATE_CLEAN_STORE: - ++clean; - checkId(clusterId, i->second->getClusterId(), - "Cluster-ID mismatch. Stores belong to different clusters."); - // Only need shutdownId to match if we are in an initially forming cluster. - if (initialCluster) - checkId(shutdownId, i->second->getShutdownId(), - "Shutdown-ID mismatch. Stores were not shut down together"); - break; - } - } - // Can't mix transient and persistent members. - if (none && (clean+dirty+empty)) - throw Exception("Mixing transient and persistent brokers in a cluster"); - - if (map.size() >= size) { - // All initial members are present. If there are no active - // members and there are dirty stores there must be at least - // one clean store. - if (!active && dirty && !clean) - throw Exception("Cannot recover, no clean store."); - } -} - -std::string InitialStatusMap::getFirstConfigStr() const { - assert(!firstConfig.empty()); - return encodeMemberSet(firstConfig); -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/InitialStatusMap.h b/cpp/src/qpid/cluster/InitialStatusMap.h deleted file mode 100644 index a5a600365e..0000000000 --- a/cpp/src/qpid/cluster/InitialStatusMap.h +++ /dev/null @@ -1,91 +0,0 @@ -#ifndef QPID_CLUSTER_INITIALSTATUSMAP_H -#define QPID_CLUSTER_INITIALSTATUSMAP_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "MemberSet.h" -#include <qpid/framing/ClusterInitialStatusBody.h> -#include <boost/optional.hpp> - -namespace qpid { -namespace cluster { - -/** - * Track status of cluster members during initialization. - * - * When a new member joins the CPG cluster, all members send an initial-status - * control. This map tracks those controls and provides data to make descisions - * about joining the cluster. - * - */ -class InitialStatusMap -{ - public: - typedef framing::ClusterInitialStatusBody Status; - - InitialStatusMap(const MemberId& self, size_t size); - /** Process a config change. May make isResendNeeded() true. */ - void configChange(const MemberSet& newConfig); - /** @return true if we need to re-send status */ - bool isResendNeeded(); - - /** Process received status */ - void received(const MemberId&, const Status& is); - - /**@return true if the map has an entry for all current cluster members. */ - bool isComplete() const; - - size_t getActualSize() const { return map.size(); } - size_t getRequiredSize() const { return size; } - - /**@return true if the map was completed by the last config change or received. */ - bool transitionToComplete(); - /**@pre isComplete(). @return this node's elders */ - MemberSet getElders() const; - /**@pre isComplete(). @return True if there are active members of the cluster. */ - bool isActive(); - /**@pre isComplete(). @return True if we need to request an update. */ - bool isUpdateNeeded(); - /**@pre isComplete(). @return Cluster-wide cluster ID. */ - framing::Uuid getClusterId(); - /**@pre isComplete(). @throw Exception if there are any inconsistencies. */ - void checkConsistent(); - - /** Get first config-change for this member, encoded as a string. - *@pre configChange has been called at least once. - */ - std::string getFirstConfigStr() const; - private: - typedef std::map<MemberId, boost::optional<Status> > Map; - static bool notInitialized(const Map::value_type&); - static bool isActiveEntry(const Map::value_type&); - static bool hasStore(const Map::value_type&); - - Map map; - MemberSet firstConfig; - MemberId self; - bool completed, resendNeeded; - size_t size; -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_INITIALSTATUSMAP_H*/ diff --git a/cpp/src/qpid/cluster/LockedConnectionMap.h b/cpp/src/qpid/cluster/LockedConnectionMap.h deleted file mode 100644 index ac744d4f94..0000000000 --- a/cpp/src/qpid/cluster/LockedConnectionMap.h +++ /dev/null @@ -1,65 +0,0 @@ -#ifndef QPID_CLUSTER_LOCKEDCONNECTIONMAP_H -#define QPID_CLUSTER_LOCKEDCONNECTIONMAP_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/cluster/types.h" -#include "qpid/sys/Mutex.h" -#include "qpid/cluster/Connection.h" - -namespace qpid { -namespace cluster { - -/** - * Thread safe map of connections. - */ -class LockedConnectionMap -{ - public: - void insert(const ConnectionPtr& c) { - sys::Mutex::ScopedLock l(lock); - assert(map.find(c->getId()) == map.end()); - map[c->getId()] = c; - } - - ConnectionPtr getErase(const ConnectionId& c) { - sys::Mutex::ScopedLock l(lock); - Map::iterator i = map.find(c); - if (i != map.end()) { - ConnectionPtr cp = i->second; - map.erase(i); - return cp; - } - else - return 0; - } - - void clear() { sys::Mutex::ScopedLock l(lock); map.clear(); } - - private: - typedef std::map<ConnectionId, ConnectionPtr> Map; - mutable sys::Mutex lock; - Map map; -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_LOCKEDCONNECTIONMAP_H*/ diff --git a/cpp/src/qpid/cluster/McastFrameHandler.h b/cpp/src/qpid/cluster/McastFrameHandler.h deleted file mode 100644 index 17e4c2e9f0..0000000000 --- a/cpp/src/qpid/cluster/McastFrameHandler.h +++ /dev/null @@ -1,46 +0,0 @@ -#ifndef QPID_CLUSTER_MCASTFRAMEHANDLER_H -#define QPID_CLUSTER_MCASTFRAMEHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/cluster/types.h" -#include "qpid/cluster/Multicaster.h" -#include "qpid/framing/FrameHandler.h" - -namespace qpid { -namespace cluster { - -/** - * A frame handler that multicasts frames as CONTROL events. - */ -class McastFrameHandler : public framing::FrameHandler -{ - public: - McastFrameHandler(Multicaster& m, const ConnectionId& cid) : mcast(m), connection(cid) {} - void handle(framing::AMQFrame& frame) { mcast.mcastControl(frame, connection); } - private: - Multicaster& mcast; - ConnectionId connection; -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_MCASTFRAMEHANDLER_H*/ diff --git a/cpp/src/qpid/cluster/MemberSet.cpp b/cpp/src/qpid/cluster/MemberSet.cpp deleted file mode 100644 index 97748947b3..0000000000 --- a/cpp/src/qpid/cluster/MemberSet.cpp +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "MemberSet.h" -#include <ostream> -#include <iterator> -#include <algorithm> - -namespace qpid { -namespace cluster { - -std::string encodeMemberSet(const MemberSet& m) { - std::string addresses; - for (MemberSet::const_iterator i = m.begin(); i != m.end(); ++i) - addresses.append(i->str()); - return addresses; -} - -MemberSet decodeMemberSet(const std::string& s) { - MemberSet set; - for (std::string::const_iterator i = s.begin(); i < s.end(); i += 8) { - assert(size_t(i-s.begin())+8 <= s.size()); - set.insert(MemberId(std::string(i, i+8))); - } - return set; -} - -MemberSet intersection(const MemberSet& a, const MemberSet& b) -{ - MemberSet intersection; - std::set_intersection(a.begin(), a.end(), - b.begin(), b.end(), - std::inserter(intersection, intersection.begin())); - return intersection; - -} - -std::ostream& operator<<(std::ostream& o, const MemberSet& ms) { - copy(ms.begin(), ms.end(), std::ostream_iterator<MemberId>(o, " ")); - return o; -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/MemberSet.h b/cpp/src/qpid/cluster/MemberSet.h deleted file mode 100644 index 7c97145dc1..0000000000 --- a/cpp/src/qpid/cluster/MemberSet.h +++ /dev/null @@ -1,45 +0,0 @@ -#ifndef QPID_CLUSTER_MEMBERSET_H -#define QPID_CLUSTER_MEMBERSET_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "types.h" -#include <set> -#include <iosfwd> - -namespace qpid { -namespace cluster { - -typedef std::set<MemberId> MemberSet; - -std::string encodeMemberSet(const MemberSet&); - -MemberSet decodeMemberSet(const std::string&); - -MemberSet intersection(const MemberSet& a, const MemberSet& b); - -std::ostream& operator<<(std::ostream& o, const MemberSet& ms); - - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_MEMBERSET_H*/ diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp deleted file mode 100644 index 8916de9628..0000000000 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/cluster/Multicaster.h" -#include "qpid/cluster/Cpg.h" -#include "qpid/log/Statement.h" -#include "qpid/framing/AMQBody.h" -#include "qpid/framing/AMQFrame.h" - -namespace qpid { -namespace cluster { - -Multicaster::Multicaster(Cpg& cpg_, - const boost::shared_ptr<sys::Poller>& poller, - boost::function<void()> onError_) : - onError(onError_), cpg(cpg_), - queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), - ready(false), bypass(true) -{} - -void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) { - mcast(Event::control(body, id)); -} - -void Multicaster::mcastControl(const framing::AMQFrame& frame, const ConnectionId& id) { - mcast(Event::control(frame, id)); -} - -void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) { - Event e(DATA, id, size); - memcpy(e.getData(), data, size); - mcast(e); -} - -void Multicaster::mcast(const Event& e) { - { - sys::Mutex::ScopedLock l(lock); - if (!ready && e.isConnection()) { - holdingQueue.push_back(e); - return; - } - } - QPID_LOG(trace, "MCAST " << e); - if (bypass) { // direct, don't queue - iovec iov = e.toIovec(); - while (!cpg.mcast(&iov, 1)) - ; - } - else - queue.push(e); -} - -Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) { - try { - PollableEventQueue::Batch::const_iterator i = values.begin(); - while( i != values.end()) { - iovec iov = i->toIovec(); - if (!cpg.mcast(&iov, 1)) { - // cpg didn't send because of CPG flow control. - break; - } - ++i; - } - return i; - } - catch (const std::exception& e) { - QPID_LOG(critical, "Multicast error: " << e.what()); - queue.stop(); - onError(); - return values.end(); - } -} - -void Multicaster::start() { - queue.start(); - bypass = false; -} - -void Multicaster::setReady() { - sys::Mutex::ScopedLock l(lock); - ready = true; - std::for_each(holdingQueue.begin(), holdingQueue.end(), boost::bind(&Multicaster::mcast, this, _1)); - holdingQueue.clear(); -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h deleted file mode 100644 index f70bd5ca31..0000000000 --- a/cpp/src/qpid/cluster/Multicaster.h +++ /dev/null @@ -1,92 +0,0 @@ -#ifndef QPID_CLUSTER_MULTICASTER_H -#define QPID_CLUSTER_MULTICASTER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/cluster/types.h" -#include "qpid/cluster/Event.h" -#include "qpid/sys/PollableQueue.h" -#include "qpid/sys/Mutex.h" -#include <boost/shared_ptr.hpp> -#include <deque> - -namespace qpid { - -namespace sys { -class Poller; -} - -namespace cluster { - -class Cpg; - -/** - * Multicast to the cluster. Shared, thread safe object. - * - * holding mode: Hold connection events for later multicast. Cluster - * events are never held. Used during PRE_INIT/INIT state when we - * want to hold any connection traffic till we are read in the - * cluster. - * - * bypass mode: Multicast cluster events directly in the calling - * thread. This mode is used by cluster in PRE_INIT state the poller - * is not yet be active. - * - * Multicaster is created in bypass+holding mode, they are disabled by - * start and setReady respectively. - */ -class Multicaster -{ - public: - /** Starts in initializing mode. */ - Multicaster(Cpg& cpg_, - const boost::shared_ptr<sys::Poller>&, - boost::function<void()> onError - ); - void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&); - void mcastControl(const framing::AMQFrame& controlFrame, const ConnectionId&); - void mcastBuffer(const char*, size_t, const ConnectionId&); - void mcast(const Event& e); - - /** Start the pollable queue, turn off bypass mode. */ - void start(); - /** Switch to ready mode, release held messages. */ - void setReady(); - - private: - typedef sys::PollableQueue<Event> PollableEventQueue; - typedef std::deque<Event> PlainEventQueue; - - PollableEventQueue::Batch::const_iterator sendMcast(const PollableEventQueue::Batch& ); - - sys::Mutex lock; - boost::function<void()> onError; - Cpg& cpg; - PollableEventQueue queue; - bool ready; - PlainEventQueue holdingQueue; - std::vector<struct ::iovec> ioVector; - bool bypass; -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_MULTICASTER_H*/ diff --git a/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h b/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h deleted file mode 100644 index 566a82476e..0000000000 --- a/cpp/src/qpid/cluster/NoOpConnectionOutputHandler.h +++ /dev/null @@ -1,47 +0,0 @@ -#ifndef QPID_CLUSTER_NOOPCONNECTIONOUTPUTHANDLER_H -#define QPID_CLUSTER_NOOPCONNECTIONOUTPUTHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <qpid/sys/ConnectionOutputHandler.h> - -namespace qpid { - -namespace framing { class AMQFrame; } - -namespace cluster { - -/** - * Output handler shadow connections, simply discards frames. - */ -class NoOpConnectionOutputHandler : public sys::ConnectionOutputHandler -{ - public: - virtual void send(framing::AMQFrame&) {} - virtual void close() {} - virtual void abort() {} - virtual void activateOutput() {} - virtual void giveReadCredit(int32_t) {} -}; - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_NOOPCONNECTIONOUTPUTHANDLER_H*/ diff --git a/cpp/src/qpid/cluster/Numbering.h b/cpp/src/qpid/cluster/Numbering.h deleted file mode 100644 index 99e152c212..0000000000 --- a/cpp/src/qpid/cluster/Numbering.h +++ /dev/null @@ -1,68 +0,0 @@ -#ifndef QPID_CLUSTER_NUMBERING_H -#define QPID_CLUSTER_NUMBERING_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <map> -#include <vector> - -namespace qpid { -namespace cluster { - -/** - * A set of numbered T, with two way mapping number->T T->number - * Used to construct numberings of objects by code sending and receiving updates. - */ -template <class T> class Numbering -{ - public: - size_t size() const { return byNumber.size(); } - - size_t add(const T& t) { - size_t n = (*this)[t]; // Already in the set? - if (n == size()) { - byObject[t] = n; - byNumber.push_back(t); - } - return n; - } - - void clear() { byObject.clear(); byNumber.clear(); } - - /**@return object at index n or T() if n > size() */ - T operator[](size_t n) const { return(n < size()) ? byNumber[n] : T(); } - - /**@return index of t or size() if t is not in the map */ - size_t operator[](const T& t) const { - typename Map::const_iterator i = byObject.find(t); - return (i != byObject.end()) ? i->second : size(); - } - - private: - typedef std::map<T, size_t> Map; - Map byObject; - std::vector<T> byNumber; -}; - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_NUMBERING_H*/ diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp deleted file mode 100644 index 4bf03eefa2..0000000000 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ /dev/null @@ -1,125 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/cluster/OutputInterceptor.h" -#include "qpid/cluster/Connection.h" -#include "qpid/cluster/Cluster.h" -#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/log/Statement.h" -#include <boost/current_function.hpp> - - -namespace qpid { -namespace cluster { - -using namespace framing; -using namespace std; - -NoOpConnectionOutputHandler OutputInterceptor::discardHandler; - -OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h) - : parent(p), closing(false), next(&h), sendMax(2048), sent(0), sentDoOutput(false) -{} - -void OutputInterceptor::send(framing::AMQFrame& f) { - sys::Mutex::ScopedLock l(lock); - next->send(f); -} - -void OutputInterceptor::activateOutput() { - sys::Mutex::ScopedLock l(lock); - if (parent.isCatchUp()) - next->activateOutput(); - else - sendDoOutput(sendMax, l); -} - -void OutputInterceptor::abort() { - sys::Mutex::ScopedLock l(lock); - if (parent.isLocal()) { - next->abort(); - } -} - -void OutputInterceptor::giveReadCredit(int32_t credit) { - sys::Mutex::ScopedLock l(lock); - next->giveReadCredit(credit); -} - -// Called in write thread when the IO layer has no more data to write. -// We only process IO callbacks in the write thread during catch-up. -// Normally we run doOutput only on delivery of doOutput requests. -bool OutputInterceptor::doOutput() { - parent.doCatchupIoCallbacks(); - return false; -} - -// Send output up to limit, calculate new limit. -void OutputInterceptor::deliverDoOutput(uint32_t limit) { - sys::Mutex::ScopedLock l(lock); - sentDoOutput = false; - sendMax = limit; - size_t newLimit = limit; - if (parent.isLocal()) { - size_t buffered = next->getBuffered(); - if (buffered == 0 && sent == sendMax) // Could have sent more, increase the limit. - newLimit = sendMax*2; - else if (buffered > 0 && sent > 1) // Data left unsent, reduce the limit. - newLimit = (sendMax + sent) / 2; - } - sent = 0; - while (sent < limit) { - { - sys::Mutex::ScopedUnlock u(lock); - if (!parent.getBrokerConnection()->doOutput()) break; - } - ++sent; - } - if (sent == limit) sendDoOutput(newLimit, l); -} - -void OutputInterceptor::sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&) { - if (parent.isLocal() && !sentDoOutput && !closing) { - sentDoOutput = true; - parent.getCluster().getMulticast().mcastControl( - ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), newLimit), - parent.getId()); - } -} - -// Called in connection thread when local connection closes. -void OutputInterceptor::closeOutput() { - sys::Mutex::ScopedLock l(lock); - closing = true; - next = &discardHandler; -} - -void OutputInterceptor::close() { - sys::Mutex::ScopedLock l(lock); - next->close(); -} - -size_t OutputInterceptor::getBuffered() const { - sys::Mutex::ScopedLock l(lock); - return next->getBuffered(); -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/OutputInterceptor.h b/cpp/src/qpid/cluster/OutputInterceptor.h deleted file mode 100644 index 3abf5273a0..0000000000 --- a/cpp/src/qpid/cluster/OutputInterceptor.h +++ /dev/null @@ -1,79 +0,0 @@ -#ifndef QPID_CLUSTER_OUTPUTINTERCEPTOR_H -#define QPID_CLUSTER_OUTPUTINTERCEPTOR_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/cluster/NoOpConnectionOutputHandler.h" -#include "qpid/sys/ConnectionOutputHandler.h" -#include "qpid/sys/Mutex.h" -#include "qpid/broker/ConnectionFactory.h" -#include <boost/function.hpp> - -namespace qpid { -namespace framing { class AMQFrame; } -namespace cluster { - -class Connection; - -/** - * Interceptor for connection OutputHandler, manages outgoing message replication. - */ -class OutputInterceptor : public sys::ConnectionOutputHandler { - public: - OutputInterceptor(cluster::Connection& p, sys::ConnectionOutputHandler& h); - - // sys::ConnectionOutputHandler functions - void send(framing::AMQFrame& f); - void abort(); - void activateOutput(); - void giveReadCredit(int32_t); - void close(); - size_t getBuffered() const; - - // Delivery point for doOutput requests. - void deliverDoOutput(uint32_t limit); - // Intercept doOutput requests on Connection. - bool doOutput(); - - void closeOutput(); - - uint32_t getSendMax() const { return sendMax; } - void setSendMax(uint32_t sendMax_) { sendMax=sendMax_; } - - cluster::Connection& parent; - - private: - typedef sys::Mutex::ScopedLock Locker; - - void sendDoOutput(size_t newLimit, const sys::Mutex::ScopedLock&); - - mutable sys::Mutex lock; - bool closing; - sys::ConnectionOutputHandler* next; - static NoOpConnectionOutputHandler discardHandler; - uint32_t sendMax, sent; - bool sentDoOutput; -}; - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_OUTPUTINTERCEPTOR_H*/ diff --git a/cpp/src/qpid/cluster/PollableQueue.h b/cpp/src/qpid/cluster/PollableQueue.h deleted file mode 100644 index 10e2ed6ac3..0000000000 --- a/cpp/src/qpid/cluster/PollableQueue.h +++ /dev/null @@ -1,89 +0,0 @@ -#ifndef QPID_CLUSTER_POLLABLEQUEUE_H -#define QPID_CLUSTER_POLLABLEQUEUE_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/PollableQueue.h" -#include <qpid/log/Statement.h> - -namespace qpid { -namespace cluster { - -/** - * More convenient version of PollableQueue that handles iterating - * over the batch and error handling. - * - * Constructed in "bypass" mode where items are processed directly - * rather than put on the queue. This is important for the - * PRE_INIT stage when Cluster is pumping CPG dispatch directly - * before the poller has started. - * - * Calling start() starts the pollable queue and disabled bypass mode. - */ -template <class T> class PollableQueue : public sys::PollableQueue<T> { - public: - typedef boost::function<void (const T&)> Callback; - typedef boost::function<void()> ErrorCallback; - - PollableQueue(Callback f, ErrorCallback err, const std::string& msg, - const boost::shared_ptr<sys::Poller>& poller) - : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1), - poller), - callback(f), error(err), message(msg), bypass(true) - {} - - typename sys::PollableQueue<T>::Batch::const_iterator - handleBatch(const typename sys::PollableQueue<T>::Batch& values) { - try { - typename sys::PollableQueue<T>::Batch::const_iterator i = values.begin(); - while (i != values.end() && !this->isStopped()) { - callback(*i); - ++i; - } - return i; - } - catch (const std::exception& e) { - QPID_LOG(critical, message << ": " << e.what()); - this->stop(); - error(); - return values.end(); - } - } - - void push(const T& t) { - if (bypass) callback(t); - else sys::PollableQueue<T>::push(t); - } - - void bypassOff() { bypass = false; } - - private: - Callback callback; - ErrorCallback error; - std::string message; - bool bypass; -}; - - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_POLLABLEQUEUE_H*/ diff --git a/cpp/src/qpid/cluster/PollerDispatch.cpp b/cpp/src/qpid/cluster/PollerDispatch.cpp deleted file mode 100644 index b8d94b95a5..0000000000 --- a/cpp/src/qpid/cluster/PollerDispatch.cpp +++ /dev/null @@ -1,69 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/cluster/PollerDispatch.h" - -#include "qpid/log/Statement.h" -#include <boost/bind.hpp> - -namespace qpid { -namespace cluster { - -PollerDispatch::PollerDispatch(Cpg& c, boost::shared_ptr<sys::Poller> p, - boost::function<void()> e) - : cpg(c), poller(p), onError(e), - dispatchHandle(cpg, - boost::bind(&PollerDispatch::dispatch, this, _1), // read - 0, // write - boost::bind(&PollerDispatch::disconnect, this, _1) // disconnect - ), - started(false) -{} - -PollerDispatch::~PollerDispatch() { - if (started) - dispatchHandle.stopWatch(); -} - -void PollerDispatch::start() { - dispatchHandle.startWatch(poller); - started = true; -} - -// Entry point: called by IO to dispatch CPG events. -void PollerDispatch::dispatch(sys::DispatchHandle& h) { - try { - cpg.dispatchAll(); - h.rewatch(); - } catch (const std::exception& e) { - QPID_LOG(critical, "Error in cluster dispatch: " << e.what()); - onError(); - } -} - -// Entry point: called if disconnected from CPG. -void PollerDispatch::disconnect(sys::DispatchHandle& ) { - if (!poller->hasShutdown()) { - QPID_LOG(critical, "Disconnected from cluster"); - onError(); - } -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/PollerDispatch.h b/cpp/src/qpid/cluster/PollerDispatch.h deleted file mode 100644 index 63801e0de9..0000000000 --- a/cpp/src/qpid/cluster/PollerDispatch.h +++ /dev/null @@ -1,60 +0,0 @@ -#ifndef QPID_CLUSTER_POLLERDISPATCH_H -#define QPID_CLUSTER_POLLERDISPATCH_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/cluster/Cpg.h" -#include "qpid/sys/Poller.h" -#include "qpid/sys/DispatchHandle.h" -#include <boost/function.hpp> - -namespace qpid { -namespace cluster { - -/** - * Dispatch CPG events via the poller. - */ -class PollerDispatch { - public: - PollerDispatch(Cpg&, boost::shared_ptr<sys::Poller> poller, - boost::function<void()> onError) ; - - ~PollerDispatch(); - - void start(); - - private: - // Poller callbacks - void dispatch(sys::DispatchHandle&); // Dispatch CPG events. - void disconnect(sys::DispatchHandle&); // CPG was disconnected - - Cpg& cpg; - boost::shared_ptr<sys::Poller> poller; - boost::function<void()> onError; - sys::DispatchHandleRef dispatchHandle; - bool started; - - -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_POLLERDISPATCH_H*/ diff --git a/cpp/src/qpid/cluster/ProxyInputHandler.h b/cpp/src/qpid/cluster/ProxyInputHandler.h deleted file mode 100644 index ad7f2c44bd..0000000000 --- a/cpp/src/qpid/cluster/ProxyInputHandler.h +++ /dev/null @@ -1,56 +0,0 @@ -#ifndef QPID_CLUSTER_PROXYINPUTHANDLER_H -#define QPID_CLUSTER_PROXYINPUTHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/ConnectionInputHandler.h" -#include <boost/intrusive_ptr.hpp> - -namespace qpid { - -namespace framing { class AMQFrame; } - -namespace cluster { - -/** - * Proxies ConnectionInputHandler functions and ensures target.closed() - * is called, on deletion if not before. - */ -class ProxyInputHandler : public sys::ConnectionInputHandler -{ - public: - ProxyInputHandler(boost::intrusive_ptr<cluster::Connection> t) : target(t) {} - ~ProxyInputHandler() { closed(); } - - void received(framing::AMQFrame& f) { target->received(f); } - void closed() { if (target) target->closed(); target = 0; } - void idleOut() { target->idleOut(); } - void idleIn() { target->idleIn(); } - bool doOutput() { return target->doOutput(); } - - private: - boost::intrusive_ptr<cluster::Connection> target; -}; - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_PROXYINPUTHANDLER_H*/ diff --git a/cpp/src/qpid/cluster/Quorum.h b/cpp/src/qpid/cluster/Quorum.h deleted file mode 100644 index bbfa473f94..0000000000 --- a/cpp/src/qpid/cluster/Quorum.h +++ /dev/null @@ -1,32 +0,0 @@ -#ifndef QPID_CLUSTER_QUORUM_H -#define QPID_CLUSTER_QUORUM_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "config.h" - -#if HAVE_LIBCMAN_H -#include "qpid/cluster/Quorum_cman.h" -#else -#include "qpid/cluster/Quorum_null.h" -#endif - -#endif /*!QPID_CLUSTER_QUORUM_H*/ diff --git a/cpp/src/qpid/cluster/Quorum_cman.cpp b/cpp/src/qpid/cluster/Quorum_cman.cpp deleted file mode 100644 index 728f824b16..0000000000 --- a/cpp/src/qpid/cluster/Quorum_cman.cpp +++ /dev/null @@ -1,105 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/cluster/Quorum_cman.h" -#include "qpid/cluster/Cluster.h" -#include "qpid/log/Statement.h" -#include "qpid/Options.h" -#include "qpid/sys/Time.h" -#include "qpid/sys/posix/PrivatePosix.h" - -namespace qpid { -namespace cluster { - -namespace { - -boost::function<void()> errorFn; - -void cmanCallbackFn(cman_handle_t handle, void */*privdata*/, int reason, int /*arg*/) { - if (reason == CMAN_REASON_STATECHANGE && !cman_is_quorate(handle)) { - QPID_LOG(critical, "Lost contact with cluster quorum."); - if (errorFn) errorFn(); - cman_stop_notification(handle); - } -} -} - -Quorum::Quorum(boost::function<void()> err) : cman(0), cmanFd(0) { - errorFn = err; -} - -Quorum::~Quorum() { - if (dispatchHandle.get()) dispatchHandle->stopWatch(); - dispatchHandle.reset(); - if (cman) cman_finish(cman); -} - -void Quorum::start(boost::shared_ptr<sys::Poller> p) { - poller = p; - QPID_LOG(debug, "Connecting to quorum service."); - cman = cman_init(0); - if (cman == 0) throw ErrnoException("Can't connect to cman service"); - if (!cman_is_quorate(cman)) { - QPID_LOG(notice, "Waiting for cluster quorum."); - while(!cman_is_quorate(cman)) sys::sleep(5); - } - int err = cman_start_notification(cman, cmanCallbackFn); - if (err != 0) throw ErrnoException("Can't register for cman notifications"); - watch(getFd()); -} - -void Quorum::watch(int fd) { - cmanFd = fd; - if (dispatchHandle.get()) dispatchHandle->stopWatch(); - ioHandle.reset(new sys::PosixIOHandle(cmanFd)); - dispatchHandle.reset( - new sys::DispatchHandleRef( - *ioHandle, // This must outlive the dispatchHandleRef - boost::bind(&Quorum::dispatch, this, _1), // read - 0, // write - boost::bind(&Quorum::disconnect, this, _1) // disconnect - )); - dispatchHandle->startWatch(poller); -} - -int Quorum::getFd() { - int fd = cman_get_fd(cman); - if (fd == 0) throw ErrnoException("Can't get cman file descriptor"); - return fd; -} - -void Quorum::dispatch(sys::DispatchHandle&) { - try { - cman_dispatch(cman, CMAN_DISPATCH_ALL); - int fd = getFd(); - if (fd != cmanFd) watch(fd); - } catch (const std::exception& e) { - QPID_LOG(critical, "Error in quorum dispatch: " << e.what()); - errorFn(); - } -} - -void Quorum::disconnect(sys::DispatchHandle&) { - QPID_LOG(critical, "Disconnected from quorum service"); - errorFn(); -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Quorum_cman.h b/cpp/src/qpid/cluster/Quorum_cman.h deleted file mode 100644 index 98e6baee89..0000000000 --- a/cpp/src/qpid/cluster/Quorum_cman.h +++ /dev/null @@ -1,65 +0,0 @@ -#ifndef QPID_CLUSTER_QUORUM_CMAN_H -#define QPID_CLUSTER_QUORUM_CMAN_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <qpid/sys/DispatchHandle.h> -#include <boost/function.hpp> -#include <boost/shared_ptr.hpp> -#include <memory> - -extern "C" { -#include <libcman.h> -} - -namespace qpid { -namespace sys { -class Poller; -class PosixIOHandle; -} - -namespace cluster { -class Cluster; - -class Quorum { - public: - Quorum(boost::function<void ()> onError); - ~Quorum(); - void start(boost::shared_ptr<sys::Poller>); - - private: - void dispatch(sys::DispatchHandle&); - void disconnect(sys::DispatchHandle&); - int getFd(); - void watch(int fd); - - cman_handle_t cman; - int cmanFd; - std::auto_ptr<sys::PosixIOHandle> ioHandle; - std::auto_ptr<sys::DispatchHandleRef> dispatchHandle; - boost::shared_ptr<sys::Poller> poller; -}; - - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_QUORUM_CMAN_H*/ diff --git a/cpp/src/qpid/cluster/Quorum_null.h b/cpp/src/qpid/cluster/Quorum_null.h deleted file mode 100644 index dc27f0a43b..0000000000 --- a/cpp/src/qpid/cluster/Quorum_null.h +++ /dev/null @@ -1,42 +0,0 @@ -#ifndef QPID_CLUSTER_QUORUM_NULL_H -#define QPID_CLUSTER_QUORUM_NULL_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <boost/shared_ptr.hpp> -#include <boost/function.hpp> - -namespace qpid { -namespace cluster { -class Cluster; - -/** Null implementation of quorum. */ - -class Quorum { - public: - Quorum(boost::function<void ()>) {} - void start(boost::shared_ptr<sys::Poller>) {} -}; - -#endif /*!QPID_CLUSTER_QUORUM_NULL_H*/ - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/RetractClient.cpp b/cpp/src/qpid/cluster/RetractClient.cpp deleted file mode 100644 index a8c4b0d543..0000000000 --- a/cpp/src/qpid/cluster/RetractClient.cpp +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/cluster/RetractClient.h" -#include "qpid/cluster/UpdateClient.h" -#include "qpid/framing/ClusterConnectionRetractOfferBody.h" -#include "qpid/client/ConnectionAccess.h" -#include "qpid/client/ConnectionImpl.h" -#include "qpid/log/Statement.h" - -namespace qpid { -namespace cluster { - -using namespace framing; - -namespace { - -struct AutoClose { - client::Connection& connection; - AutoClose(client::Connection& c) : connection(c) {} - ~AutoClose() { connection.close(); } -}; -} - -RetractClient::RetractClient(const Url& u, const client::ConnectionSettings& cs) - : url(u), connectionSettings(cs) -{} - -RetractClient::~RetractClient() { delete this; } - - -void RetractClient::run() { - try { - client::Connection c = UpdateClient::catchUpConnection(); - c.open(url, connectionSettings); - AutoClose ac(c); - AMQFrame retract((ClusterConnectionRetractOfferBody())); - client::ConnectionAccess::getImpl(c)->expand(retract.encodedSize(), false); - client::ConnectionAccess::getImpl(c)->handle(retract); - } catch (const std::exception& e) { - QPID_LOG(error, " while retracting retract to " << url << ": " << e.what()); - } -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/RetractClient.h b/cpp/src/qpid/cluster/RetractClient.h deleted file mode 100644 index 533fc3f7ef..0000000000 --- a/cpp/src/qpid/cluster/RetractClient.h +++ /dev/null @@ -1,50 +0,0 @@ -#ifndef QPID_CLUSTER_RETRACTCLIENT_H -#define QPID_CLUSTER_RETRACTCLIENT_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/client/ConnectionSettings.h" -#include "qpid/sys/Runnable.h" -#include "qpid/Url.h" - - -namespace qpid { -namespace cluster { - -/** - * A client that retracts an offer to a remote broker using AMQP. @see UpdateClient - */ -class RetractClient : public sys::Runnable { - public: - - RetractClient(const Url&, const client::ConnectionSettings&); - ~RetractClient(); - void run(); // Will delete this when finished. - - private: - Url url; - client::ConnectionSettings connectionSettings; -}; - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_RETRACTCLIENT_H*/ diff --git a/cpp/src/qpid/cluster/SecureConnectionFactory.cpp b/cpp/src/qpid/cluster/SecureConnectionFactory.cpp deleted file mode 100644 index 6ddef66226..0000000000 --- a/cpp/src/qpid/cluster/SecureConnectionFactory.cpp +++ /dev/null @@ -1,73 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/cluster/SecureConnectionFactory.h" -#include "qpid/framing/ProtocolVersion.h" -#include "qpid/cluster/ConnectionCodec.h" -#include "qpid/broker/SecureConnection.h" -#include "qpid/sys/SecuritySettings.h" -#include "qpid/log/Statement.h" -#include <memory> - - -namespace qpid { -namespace cluster { - -using framing::ProtocolVersion; -using qpid::sys::SecuritySettings; -using qpid::broker::SecureConnection; - -typedef std::auto_ptr<qpid::broker::SecureConnection> SecureConnectionPtr; -typedef std::auto_ptr<qpid::sys::ConnectionCodec> CodecPtr; - -SecureConnectionFactory::SecureConnectionFactory(CodecFactoryPtr f) : codecFactory(f) { -} - -sys::ConnectionCodec* -SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id, - const SecuritySettings& external) { - CodecPtr codec(codecFactory->create(v, out, id, external)); - ConnectionCodec* clusterCodec = dynamic_cast<qpid::cluster::ConnectionCodec*>(codec.get()); - if (clusterCodec) { - SecureConnectionPtr sc(new SecureConnection()); - clusterCodec->setSecureConnection(sc.get()); - sc->setCodec(codec); - return sc.release(); - } - return 0; -} - -sys::ConnectionCodec* -SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id, - const SecuritySettings& external) { - // used to create connections from one broker to another - CodecPtr codec(codecFactory->create(out, id, external)); - ConnectionCodec* clusterCodec = dynamic_cast<qpid::cluster::ConnectionCodec*>(codec.get()); - if (clusterCodec) { - SecureConnectionPtr sc(new SecureConnection()); - clusterCodec->setSecureConnection(sc.get()); - sc->setCodec(codec); - return sc.release(); - } - return 0; -} - - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/SecureConnectionFactory.h b/cpp/src/qpid/cluster/SecureConnectionFactory.h deleted file mode 100644 index 24d1fcfee5..0000000000 --- a/cpp/src/qpid/cluster/SecureConnectionFactory.h +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef QPID_CLUSTER_SecureconnectionFactory -#define QPID_CLUSTER_SecureconnectionFactory - -#include "qpid/sys/ConnectionCodec.h" -#include <boost/shared_ptr.hpp> - -namespace qpid { - -namespace broker { - class Broker; -} - -namespace cluster { - -class SecureConnectionFactory : public qpid::sys::ConnectionCodec::Factory -{ - public: - typedef boost::shared_ptr<qpid::sys::ConnectionCodec::Factory> CodecFactoryPtr; - SecureConnectionFactory(CodecFactoryPtr f); - - qpid::sys::ConnectionCodec* create( - framing::ProtocolVersion, qpid::sys::OutputControl&, const std::string& id, - const qpid::sys::SecuritySettings& - ); - - /** Return "preferred" codec for outbound connections. */ - qpid::sys::ConnectionCodec* create( - qpid::sys::OutputControl&, const std::string& id, const qpid::sys::SecuritySettings& - ); - - private: - CodecFactoryPtr codecFactory; -}; - -}} // namespace qpid::cluster - - -#endif // QPID_CLUSTER_SecureconnectionFactory diff --git a/cpp/src/qpid/cluster/StoreStatus.cpp b/cpp/src/qpid/cluster/StoreStatus.cpp deleted file mode 100644 index 14c999bb05..0000000000 --- a/cpp/src/qpid/cluster/StoreStatus.cpp +++ /dev/null @@ -1,170 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "StoreStatus.h" -#include "qpid/Exception.h" -#include "qpid/Msg.h" -#include "qpid/log/Statement.h" -#include <boost/filesystem/path.hpp> -#include <boost/filesystem/fstream.hpp> -#include <boost/filesystem/operations.hpp> -#include <boost/scoped_array.hpp> -#include <fstream> -#include <sstream> - -namespace qpid { -namespace cluster { - -using framing::Uuid; -using namespace framing::cluster; -namespace fs=boost::filesystem; -using namespace std; - -StoreStatus::StoreStatus(const std::string& d) - : state(STORE_STATE_NO_STORE), dataDir(d) -{} - -namespace { - -const char* SUBDIR="cluster"; -const char* STORE_STATUS="store.status"; - -string readFile(const fs::path& path) { - fs::ifstream is; - is.exceptions(std::ios::badbit | std::ios::failbit); - is.open(path); - // get length of file: - is.seekg (0, ios::end); - size_t length = is.tellg(); - is.seekg (0, ios::beg); - // load data - boost::scoped_array<char> buffer(new char[length]); - is.read(buffer.get(), length); - is.close(); - return string(buffer.get(), length); -} - -void writeFile(const fs::path& path, const string& data) { - fs::ofstream os; - os.exceptions(std::ios::badbit | std::ios::failbit); - os.open(path); - os.write(data.data(), data.size()); - os.close(); -} - -} // namespace - - -void StoreStatus::load() { - if (dataDir.empty()) { - throw Exception(QPID_MSG("No data-dir: When a store is loaded together with clustering, --data-dir must be specified.")); - } - try { - fs::path dir = fs::path(dataDir, fs::native)/SUBDIR; - create_directory(dir); - fs::path file = dir/STORE_STATUS; - if (fs::exists(file)) { - string data = readFile(file); - istringstream is(data); - is.exceptions(std::ios::badbit | std::ios::failbit); - is >> ws >> clusterId >> ws >> shutdownId; - if (!clusterId) - throw Exception(QPID_MSG("Invalid cluster store state, no cluster-id")); - if (shutdownId) state = STORE_STATE_CLEAN_STORE; - else state = STORE_STATE_DIRTY_STORE; - } - else { // Starting from empty store - clusterId = Uuid(true); - save(); - state = STORE_STATE_EMPTY_STORE; - } - } - catch (const std::exception&e) { - throw Exception(QPID_MSG("Cannot load cluster store status: " << e.what())); - } -} - -void StoreStatus::save() { - if (dataDir.empty()) return; - try { - ostringstream os; - os << clusterId << endl << shutdownId << endl; - fs::path file = fs::path(dataDir, fs::native)/SUBDIR/STORE_STATUS; - writeFile(file, os.str()); - } - catch (const std::exception& e) { - throw Exception(QPID_MSG("Cannot save cluster store status: " << e.what())); - } -} - -bool StoreStatus::hasStore() const { - return state != framing::cluster::STORE_STATE_NO_STORE; -} - -void StoreStatus::dirty() { - assert(hasStore()); - if (shutdownId) { - shutdownId = Uuid(); - save(); - } - state = STORE_STATE_DIRTY_STORE; -} - -void StoreStatus::clean(const Uuid& shutdownId_) { - assert(hasStore()); - assert(shutdownId_); - if (shutdownId_ != shutdownId) { - shutdownId = shutdownId_; - save(); - } - state = STORE_STATE_CLEAN_STORE; -} - -void StoreStatus::setClusterId(const Uuid& clusterId_) { - clusterId = clusterId_; - save(); -} - -const char* stateName(StoreState s) { - switch (s) { - case STORE_STATE_NO_STORE: return "none"; - case STORE_STATE_EMPTY_STORE: return "empty"; - case STORE_STATE_DIRTY_STORE: return "dirty"; - case STORE_STATE_CLEAN_STORE: return "clean"; - } - assert(0); - return "unknown"; -} - -ostream& operator<<(ostream& o, framing::cluster::StoreState s) { return o << stateName(s); } - -ostream& operator<<(ostream& o, const StoreStatus& s) { - o << s.getState(); - if (s.getState() == STORE_STATE_DIRTY_STORE) - o << " cluster-id=" << s.getClusterId(); - if (s.getState() == STORE_STATE_CLEAN_STORE) { - o << " cluster-id=" << s.getClusterId() - << " shutdown-id=" << s.getShutdownId(); - } - return o; -} - -}} // namespace qpid::cluster - diff --git a/cpp/src/qpid/cluster/StoreStatus.h b/cpp/src/qpid/cluster/StoreStatus.h deleted file mode 100644 index 7442fcf02c..0000000000 --- a/cpp/src/qpid/cluster/StoreStatus.h +++ /dev/null @@ -1,68 +0,0 @@ -#ifndef QPID_CLUSTER_STORESTATE_H -#define QPID_CLUSTER_STORESTATE_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/framing/Uuid.h" -#include "qpid/framing/SequenceNumber.h" -#include "qpid/framing/enum.h" -#include <iosfwd> - -namespace qpid { -namespace cluster { - -/** - * State of the store for cluster purposes. - */ -class StoreStatus -{ - public: - typedef framing::Uuid Uuid; - typedef framing::cluster::StoreState StoreState; - - StoreStatus(const std::string& dir); - - framing::cluster::StoreState getState() const { return state; } - - const Uuid& getClusterId() const { return clusterId; } - void setClusterId(const Uuid&); - const Uuid& getShutdownId() const { return shutdownId; } - - void load(); - void dirty(); // Mark the store in use. - void clean(const Uuid& shutdownId); // Mark the store clean. - bool hasStore() const; - - private: - void save(); - - framing::cluster::StoreState state; - Uuid clusterId, shutdownId; - std::string dataDir; -}; - -const char* stateName(framing::cluster::StoreState); -std::ostream& operator<<(std::ostream&, framing::cluster::StoreState); -std::ostream& operator<<(std::ostream&, const StoreStatus&); -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_STORESTATE_H*/ diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp deleted file mode 100644 index a15c14ff48..0000000000 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ /dev/null @@ -1,641 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/amqp_0_10/Codecs.h" -#include "qpid/cluster/UpdateClient.h" -#include "qpid/cluster/Cluster.h" -#include "qpid/cluster/ClusterMap.h" -#include "qpid/cluster/Connection.h" -#include "qpid/cluster/Decoder.h" -#include "qpid/cluster/ExpiryPolicy.h" -#include "qpid/cluster/UpdateDataExchange.h" -#include "qpid/client/SessionBase_0_10Access.h" -#include "qpid/client/ConnectionAccess.h" -#include "qpid/client/SessionImpl.h" -#include "qpid/client/ConnectionImpl.h" -#include "qpid/client/Future.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/Fairshare.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/QueueRegistry.h" -#include "qpid/broker/LinkRegistry.h" -#include "qpid/broker/Bridge.h" -#include "qpid/broker/Link.h" -#include "qpid/broker/Message.h" -#include "qpid/broker/Exchange.h" -#include "qpid/broker/ExchangeRegistry.h" -#include "qpid/broker/SessionHandler.h" -#include "qpid/broker/SessionState.h" -#include "qpid/broker/TxOpVisitor.h" -#include "qpid/broker/DtxAck.h" -#include "qpid/broker/TxAccept.h" -#include "qpid/broker/TxPublish.h" -#include "qpid/broker/RecoveredDequeue.h" -#include "qpid/broker/RecoveredEnqueue.h" -#include "qpid/broker/StatefulQueueObserver.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/ClusterConnectionMembershipBody.h" -#include "qpid/framing/ClusterConnectionShadowReadyBody.h" -#include "qpid/framing/ClusterConnectionSessionStateBody.h" -#include "qpid/framing/ClusterConnectionConsumerStateBody.h" -#include "qpid/framing/enum.h" -#include "qpid/framing/ProtocolVersion.h" -#include "qpid/framing/TypeCode.h" -#include "qpid/log/Statement.h" -#include "qpid/types/Variant.h" -#include "qpid/Url.h" -#include "qmf/org/apache/qpid/broker/ManagementSetupState.h" -#include <boost/bind.hpp> -#include <boost/cast.hpp> -#include <algorithm> -#include <sstream> - -namespace qpid { -namespace cluster { - -using amqp_0_10::ListCodec; -using broker::Broker; -using broker::Exchange; -using broker::Queue; -using broker::QueueBinding; -using broker::Message; -using broker::SemanticState; -using types::Variant; - -using namespace framing; -namespace arg=client::arg; -using client::SessionBase_0_10Access; - -std::ostream& operator<<(std::ostream& o, const UpdateClient& c) { - return o << "cluster(" << c.updaterId << " UPDATER)"; -} - -struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection, public framing::FrameHandler -{ - boost::shared_ptr<qpid::client::ConnectionImpl> connection; - - ClusterConnectionProxy(client::Connection c) : - AMQP_AllProxy::ClusterConnection(*static_cast<framing::FrameHandler*>(this)), - connection(client::ConnectionAccess::getImpl(c)) {} - ClusterConnectionProxy(client::AsyncSession s) : - AMQP_AllProxy::ClusterConnection(SessionBase_0_10Access(s).get()->out) {} - - void handle(framing::AMQFrame& f) - { - assert(connection); - connection->expand(f.encodedSize(), false); - connection->handle(f); - } -}; - -// Create a connection with special version that marks it as a catch-up connection. -client::Connection UpdateClient::catchUpConnection() { - client::Connection c; - client::ConnectionAccess::setVersion(c, ProtocolVersion(0x80 , 0x80 + 10)); - return c; -} - -// Send a control body directly to the session. -void send(client::AsyncSession& s, const AMQBody& body) { - client::SessionBase_0_10Access sb(s); - sb.get()->send(body); -} - -// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. - -UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, - broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_, - const Cluster::ConnectionVector& cons, Decoder& decoder_, - const boost::function<void()>& ok, - const boost::function<void(const std::exception&)>& fail, - const client::ConnectionSettings& cs -) - : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), - expiry(expiry_), connections(cons), decoder(decoder_), - connection(catchUpConnection()), shadowConnection(catchUpConnection()), - done(ok), failed(fail), connectionSettings(cs) -{} - -UpdateClient::~UpdateClient() {} - -// Reserved exchange/queue name for catch-up, avoid clashes with user queues/exchanges. -const std::string UpdateClient::UPDATE("qpid.cluster-update"); - -void UpdateClient::run() { - try { - connection.open(updateeUrl, connectionSettings); - session = connection.newSession(UPDATE); - update(); - done(); - } catch (const std::exception& e) { - failed(e); - } - delete this; -} - -void UpdateClient::update() { - QPID_LOG(debug, *this << " updating state to " << updateeId - << " at " << updateeUrl); - Broker& b = updaterBroker; - - updateManagementSetupState(); - - b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); - b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, this, _1)); - - // Update queue is used to transfer acquired messages that are no - // longer on their original queue. - session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); - session.sync(); - std::for_each(connections.begin(), connections.end(), - boost::bind(&UpdateClient::updateConnection, this, _1)); - session.queueDelete(arg::queue=UPDATE); - - // some Queue Observers need session state & msgs synced first, so sync observers now - b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, this, _1)); - - // Update queue listeners: must come after sessions so consumerNumbering is populated - b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, this, _1)); - - ClusterConnectionProxy(session).expiryId(expiry.getId()); - updateLinks(); - updateManagementAgent(); - - session.close(); - - ClusterConnectionMembershipBody membership; - map.toMethodBody(membership); - AMQFrame frame(membership); - client::ConnectionAccess::getImpl(connection)->expand(frame.encodedSize(), false); - client::ConnectionAccess::getImpl(connection)->handle(frame); - - // NOTE: connection will be closed from the other end, don't close - // it here as that causes a race. - - // TODO aconway 2010-03-15: This sleep avoids the race condition - // described in // https://bugzilla.redhat.com/show_bug.cgi?id=568831. - // It allows the connection to fully close before destroying the - // Connection object. Remove when the bug is fixed. - // - sys::usleep(10*1000); - - QPID_LOG(debug, *this << " update completed to " << updateeId << " at " << updateeUrl); -} - -namespace { -template <class T> std::string encode(const T& t) { - std::string encoded; - encoded.resize(t.encodedSize()); - framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); - t.encode(buf); - return encoded; -} - -template <class T> std::string encode(const T& t, bool encodeKind) { - std::string encoded; - encoded.resize(t.encodedSize()); - framing::Buffer buf(const_cast<char*>(encoded.data()), encoded.size()); - t.encode(buf, encodeKind); - return encoded; -} -} // namespace - - -// Propagate the management state -void UpdateClient::updateManagementSetupState() -{ - management::ManagementAgent* agent = updaterBroker.getManagementAgent(); - if (!agent) return; - - QPID_LOG(debug, *this << " updating management setup-state."); - std::string vendor, product, instance; - agent->getName(vendor, product, instance); - ClusterConnectionProxy(session).managementSetupState( - agent->getNextObjectId(), agent->getBootSequence(), agent->getUuid(), - vendor, product, instance); -} - -void UpdateClient::updateManagementAgent() -{ - management::ManagementAgent* agent = updaterBroker.getManagementAgent(); - if (!agent) return; - string data; - - QPID_LOG(debug, *this << " updating management schemas. ") - agent->exportSchemas(data); - session.messageTransfer( - arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY), - arg::destination=UpdateDataExchange::EXCHANGE_NAME); - - QPID_LOG(debug, *this << " updating management agents. ") - agent->exportAgents(data); - session.messageTransfer( - arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_AGENTS_KEY), - arg::destination=UpdateDataExchange::EXCHANGE_NAME); - - QPID_LOG(debug, *this << " updating management deleted objects. ") - typedef management::ManagementAgent::DeletedObjectList DeletedObjectList; - DeletedObjectList deleted; - agent->exportDeletedObjects(deleted); - Variant::List list; - for (DeletedObjectList::iterator i = deleted.begin(); i != deleted.end(); ++i) { - string encoded; - (*i)->encode(encoded); - list.push_back(encoded); - } - ListCodec::encode(list, data); - session.messageTransfer( - arg::content=client::Message(data, UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY), - arg::destination=UpdateDataExchange::EXCHANGE_NAME); -} - -void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) { - QPID_LOG(debug, *this << " updating exchange " << ex->getName()); - ClusterConnectionProxy(session).exchange(encode(*ex)); -} - -/** Bind a queue to the update exchange and update messges to it - * setting the message possition as needed. - */ -class MessageUpdater { - std::string queue; - bool haveLastPos; - framing::SequenceNumber lastPos; - client::AsyncSession session; - ExpiryPolicy& expiry; - - public: - - MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) { - session.exchangeBind(queue, UpdateClient::UPDATE); - } - - ~MessageUpdater() { - try { - session.exchangeUnbind(queue, UpdateClient::UPDATE); - } - catch (const std::exception& e) { - // Don't throw in a destructor. - QPID_LOG(error, "Unbinding update queue " << queue << ": " << e.what()); - } - } - - - void updateQueuedMessage(const broker::QueuedMessage& message) { - // Send the queue position if necessary. - if (!haveLastPos || message.position - lastPos != 1) { - ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1); - haveLastPos = true; - } - lastPos = message.position; - - // Send the expiry ID if necessary. - if (message.payload->getProperties<DeliveryProperties>()->getTtl()) { - boost::optional<uint64_t> expiryId = expiry.getId(*message.payload); - ClusterConnectionProxy(session).expiryId(expiryId?*expiryId:0); - } - - // We can't send a broker::Message via the normal client API, - // and it would be expensive to copy it into a client::Message - // so we go a bit under the client API covers here. - // - SessionBase_0_10Access sb(session); - // Disable client code that clears the delivery-properties.exchange - sb.get()->setDoClearDeliveryPropertiesExchange(false); - framing::MessageTransferBody transfer( - *message.payload->getFrames().as<framing::MessageTransferBody>()); - transfer.setDestination(UpdateClient::UPDATE); - - sb.get()->send(transfer, message.payload->getFrames(), - !message.payload->isContentReleased()); - if (message.payload->isContentReleased()){ - uint16_t maxFrameSize = sb.get()->getConnection()->getNegotiatedSettings().maxFrameSize; - uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); - bool morecontent = true; - for (uint64_t offset = 0; morecontent; offset += maxContentSize) - { - AMQFrame frame((AMQContentBody())); - morecontent = message.payload->getContentFrame(*(message.queue), frame, maxContentSize, offset); - sb.get()->sendRawFrame(frame); - } - } - } - - void updateMessage(const boost::intrusive_ptr<broker::Message>& message) { - updateQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1)); - } -}; - -void UpdateClient::updateQueue(client::AsyncSession& s, const boost::shared_ptr<Queue>& q) { - broker::Exchange::shared_ptr alternateExchange = q->getAlternateExchange(); - s.queueDeclare( - arg::queue = q->getName(), - arg::durable = q->isDurable(), - arg::autoDelete = q->isAutoDelete(), - arg::alternateExchange = alternateExchange ? alternateExchange->getName() : "", - arg::arguments = q->getSettings(), - arg::exclusive = q->hasExclusiveOwner() - ); - MessageUpdater updater(q->getName(), s, expiry); - q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1)); - q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, s, q->getName(), _1)); - ClusterConnectionProxy(s).queuePosition(q->getName(), q->getPosition()); - uint priority, count; - if (qpid::broker::Fairshare::getState(q->getMessages(), priority, count)) { - ClusterConnectionProxy(s).queueFairshareState(q->getName(), priority, count); - } -} - -void UpdateClient::updateExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) { - QPID_LOG(debug, *this << " updating exclusive queue " << q->getName() << " on " << shadowSession.getId()); - updateQueue(shadowSession, q); -} - -void UpdateClient::updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue>& q) { - if (!q->hasExclusiveOwner()) { - QPID_LOG(debug, *this << " updating queue " << q->getName()); - updateQueue(session, q); - }//else queue will be updated as part of session state of owning session -} - -void UpdateClient::updateBinding(client::AsyncSession& s, const std::string& queue, const QueueBinding& binding) { - s.exchangeBind(queue, binding.exchange, binding.key, binding.args); -} - -void UpdateClient::updateOutputTask(const sys::OutputTask* task) { - const SemanticState::ConsumerImpl* cci = - boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task); - SemanticState::ConsumerImpl* ci = const_cast<SemanticState::ConsumerImpl*>(cci); - uint16_t channel = ci->getParent().getSession().getChannel(); - ClusterConnectionProxy(shadowConnection).outputTask(channel, ci->getName()); - QPID_LOG(debug, *this << " updating output task " << ci->getName() - << " channel=" << channel); -} - -void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) { - QPID_LOG(debug, *this << " updating connection " << *updateConnection); - assert(updateConnection->getBrokerConnection()); - broker::Connection& bc = *updateConnection->getBrokerConnection(); - - // Send the management ID first on the main connection. - std::string mgmtId = updateConnection->getBrokerConnection()->getMgmtId(); - ClusterConnectionProxy(session).shadowPrepare(mgmtId); - // Make sure its received before opening shadow connection - session.sync(); - - // Open shadow connection and update it. - shadowConnection = catchUpConnection(); - - connectionSettings.maxFrameSize = bc.getFrameMax(); - shadowConnection.open(updateeUrl, connectionSettings); - ClusterConnectionProxy(shadowConnection).shadowSetUser(bc.getUserId()); - - bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1)); - // Safe to use decoder here because we are stalled for update. - std::pair<const char*, size_t> fragment = decoder.get(updateConnection->getId()).getFragment(); - bc.getOutputTasks().eachOutput( - boost::bind(&UpdateClient::updateOutputTask, this, _1)); - ClusterConnectionProxy(shadowConnection).shadowReady( - updateConnection->getId().getMember(), - updateConnection->getId().getNumber(), - bc.getMgmtId(), - bc.getUserId(), - string(fragment.first, fragment.second), - updateConnection->getOutput().getSendMax() - ); - shadowConnection.close(); - QPID_LOG(debug, *this << " updated connection " << *updateConnection); -} - -void UpdateClient::updateSession(broker::SessionHandler& sh) { - broker::SessionState* ss = sh.getSession(); - if (!ss) return; // no session. - - QPID_LOG(debug, *this << " updating session " << ss->getId()); - - // Create a client session to update session state. - boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); - boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel()); - simpl->disableAutoDetach(); - client::SessionBase_0_10Access(shadowSession).set(simpl); - AMQP_AllProxy::ClusterConnection proxy(simpl->out); - - // Re-create session state on remote connection. - - QPID_LOG(debug, *this << " updating exclusive queues."); - ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue, this, _1)); - - QPID_LOG(debug, *this << " updating consumers."); - ss->getSemanticState().eachConsumer( - boost::bind(&UpdateClient::updateConsumer, this, _1)); - - QPID_LOG(debug, *this << " updating unacknowledged messages."); - broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); - std::for_each(drs.begin(), drs.end(), - boost::bind(&UpdateClient::updateUnacked, this, _1)); - - updateTxState(ss->getSemanticState()); // Tx transaction state. - - // Adjust command counter for message in progress, will be sent after state update. - boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress(); - SequenceNumber received = ss->receiverGetReceived().command; - if (inProgress) - --received; - - // Sync the session to ensure all responses from broker have been processed. - shadowSession.sync(); - - // Reset command-sequence state. - proxy.sessionState( - ss->senderGetReplayPoint().command, - ss->senderGetCommandPoint().command, - ss->senderGetIncomplete(), - std::max(received, ss->receiverGetExpected().command), - received, - ss->receiverGetUnknownComplete(), - ss->receiverGetIncomplete() - ); - - // Send frames for partial message in progress. - if (inProgress) { - inProgress->getFrames().map(simpl->out); - } - QPID_LOG(debug, *this << " updated session " << sh.getSession()->getId()); -} - -void UpdateClient::updateConsumer( - const broker::SemanticState::ConsumerImpl::shared_ptr& ci) -{ - QPID_LOG(debug, *this << " updating consumer " << ci->getName() << " on " - << shadowSession.getId()); - - using namespace message; - shadowSession.messageSubscribe( - arg::queue = ci->getQueue()->getName(), - arg::destination = ci->getName(), - arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE, - arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED, - arg::exclusive = ci->isExclusive(), - arg::resumeId = ci->getResumeId(), - arg::resumeTtl = ci->getResumeTtl(), - arg::arguments = ci->getArguments() - ); - shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); - shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); - shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit()); - ClusterConnectionProxy(shadowSession).consumerState( - ci->getName(), - ci->isBlocked(), - ci->isNotifyEnabled(), - ci->position - ); - consumerNumbering.add(ci.get()); - - QPID_LOG(debug, *this << " updated consumer " << ci->getName() - << " on " << shadowSession.getId()); -} - -void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { - if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) { - // If the message is acquired then it is no longer on the - // updatees queue, put it on the update queue for updatee to pick up. - // - MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage()); - } - ClusterConnectionProxy(shadowSession).deliveryRecord( - dr.getQueue()->getName(), - dr.getMessage().position, - dr.getTag(), - dr.getId(), - dr.isAcquired(), - dr.isAccepted(), - dr.isCancelled(), - dr.isComplete(), - dr.isEnded(), - dr.isWindowing(), - dr.getQueue()->isEnqueued(dr.getMessage()), - dr.getCredit() - ); -} - -class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { - public: - TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry) - : MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {} - - void operator()(const broker::DtxAck& ) { - throw InternalErrorException("DTX transactions not currently supported by cluster."); - } - - void operator()(const broker::RecoveredDequeue& rdeq) { - updateMessage(rdeq.getMessage()); - proxy.txEnqueue(rdeq.getQueue()->getName()); - } - - void operator()(const broker::RecoveredEnqueue& renq) { - updateMessage(renq.getMessage()); - proxy.txEnqueue(renq.getQueue()->getName()); - } - - void operator()(const broker::TxAccept& txAccept) { - proxy.txAccept(txAccept.getAcked()); - } - - void operator()(const broker::TxPublish& txPub) { - updateMessage(txPub.getMessage()); - typedef std::list<Queue::shared_ptr> QueueList; - const QueueList& qlist = txPub.getQueues(); - Array qarray(TYPE_CODE_STR8); - for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i) - qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); - proxy.txPublish(qarray, txPub.delivered); - } - - private: - UpdateClient& parent; - client::AsyncSession session; - ClusterConnectionProxy proxy; -}; - -void UpdateClient::updateTxState(broker::SemanticState& s) { - QPID_LOG(debug, *this << " updating TX transaction state."); - ClusterConnectionProxy proxy(shadowSession); - proxy.accumulatedAck(s.getAccumulatedAck()); - broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); - if (txBuffer) { - proxy.txStart(); - TxOpUpdater updater(*this, shadowSession, expiry); - txBuffer->accept(updater); - proxy.txEnd(); - } -} - -void UpdateClient::updateQueueListeners(const boost::shared_ptr<broker::Queue>& queue) { - queue->getListeners().eachListener( - boost::bind(&UpdateClient::updateQueueListener, this, queue->getName(), _1)); -} - -void UpdateClient::updateQueueListener(std::string& q, - const boost::shared_ptr<broker::Consumer>& c) -{ - SemanticState::ConsumerImpl* ci = dynamic_cast<SemanticState::ConsumerImpl*>(c.get()); - size_t n = consumerNumbering[ci]; - if (n >= consumerNumbering.size()) - throw Exception(QPID_MSG("Unexpected listener on queue " << q)); - ClusterConnectionProxy(session).addQueueListener(q, n); -} - -void UpdateClient::updateLinks() { - broker::LinkRegistry& links = updaterBroker.getLinks(); - links.eachLink(boost::bind(&UpdateClient::updateLink, this, _1)); - links.eachBridge(boost::bind(&UpdateClient::updateBridge, this, _1)); -} - -void UpdateClient::updateLink(const boost::shared_ptr<broker::Link>& link) { - QPID_LOG(debug, *this << " updating link " - << link->getHost() << ":" << link->getPort()); - ClusterConnectionProxy(session).config(encode(*link)); -} - -void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge) { - QPID_LOG(debug, *this << " updating bridge " << bridge->getName()); - ClusterConnectionProxy(session).config(encode(*bridge)); -} - -void UpdateClient::updateQueueObservers(const boost::shared_ptr<broker::Queue>& q) -{ - q->eachObserver(boost::bind(&UpdateClient::updateObserver, this, q, _1)); -} - -void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q, - boost::shared_ptr<broker::QueueObserver> o) -{ - qpid::framing::FieldTable state; - broker::StatefulQueueObserver *so = dynamic_cast<broker::StatefulQueueObserver *>(o.get()); - if (so) { - so->getState( state ); - std::string id(so->getId()); - QPID_LOG(debug, *this << " updating queue " << q->getName() << "'s observer " << id); - ClusterConnectionProxy(session).queueObserverState( q->getName(), id, state ); - } -} - - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h deleted file mode 100644 index b72d090d73..0000000000 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ /dev/null @@ -1,133 +0,0 @@ -#ifndef QPID_CLUSTER_UPDATECLIENT_H -#define QPID_CLUSTER_UPDATECLIENT_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/cluster/ClusterMap.h" -#include "qpid/cluster/Numbering.h" -#include "qpid/client/Connection.h" -#include "qpid/client/ConnectionSettings.h" -#include "qpid/client/AsyncSession.h" -#include "qpid/broker/SemanticState.h" -#include "qpid/sys/Runnable.h" -#include <boost/shared_ptr.hpp> -#include <iosfwd> - -namespace qpid { - -struct Url; - -namespace broker { - -class Broker; -class Queue; -class Exchange; -class QueueBindings; -struct QueueBinding; -struct QueuedMessage; -class SessionHandler; -class DeliveryRecord; -class SessionState; -class SemanticState; -class Decoder; -class Link; -class Bridge; -class QueueObserver; - -} // namespace broker - -namespace cluster { - -class Cluster; -class Connection; -class ClusterMap; -class Decoder; -class ExpiryPolicy; - -/** - * A client that updates the contents of a local broker to a remote one using AMQP. - */ -class UpdateClient : public sys::Runnable { - public: - static const std::string UPDATE; // Name for special update queue and exchange. - static client::Connection catchUpConnection(); - - UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&, - broker::Broker& donor, const ClusterMap& map, ExpiryPolicy& expiry, - const std::vector<boost::intrusive_ptr<Connection> >&, Decoder&, - const boost::function<void()>& done, - const boost::function<void(const std::exception&)>& fail, - const client::ConnectionSettings& - ); - - ~UpdateClient(); - void update(); - void run(); // Will delete this when finished. - - void updateUnacked(const broker::DeliveryRecord&); - - private: - void updateQueue(client::AsyncSession&, const boost::shared_ptr<broker::Queue>&); - void updateNonExclusiveQueue(const boost::shared_ptr<broker::Queue>&); - void updateExclusiveQueue(const boost::shared_ptr<broker::Queue>&); - void updateExchange(const boost::shared_ptr<broker::Exchange>&); - void updateMessage(const broker::QueuedMessage&); - void updateMessageTo(const broker::QueuedMessage&, const std::string& queue, client::Session s); - void updateBinding(client::AsyncSession&, const std::string& queue, const broker::QueueBinding& binding); - void updateConnection(const boost::intrusive_ptr<Connection>& connection); - void updateSession(broker::SessionHandler& s); - void updateTxState(broker::SemanticState& s); - void updateOutputTask(const sys::OutputTask* task); - void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&); - void updateQueueListeners(const boost::shared_ptr<broker::Queue>&); - void updateQueueListener(std::string& q, const boost::shared_ptr<broker::Consumer>& c); - void updateManagementSetupState(); - void updateManagementAgent(); - void updateLinks(); - void updateLink(const boost::shared_ptr<broker::Link>&); - void updateBridge(const boost::shared_ptr<broker::Bridge>&); - void updateQueueObservers(const boost::shared_ptr<broker::Queue>&); - void updateObserver(const boost::shared_ptr<broker::Queue>&, boost::shared_ptr<broker::QueueObserver>); - - - Numbering<broker::SemanticState::ConsumerImpl*> consumerNumbering; - MemberId updaterId; - MemberId updateeId; - Url updateeUrl; - broker::Broker& updaterBroker; - ClusterMap map; - ExpiryPolicy& expiry; - std::vector<boost::intrusive_ptr<Connection> > connections; - Decoder& decoder; - client::Connection connection, shadowConnection; - client::AsyncSession session, shadowSession; - boost::function<void()> done; - boost::function<void(const std::exception& e)> failed; - client::ConnectionSettings connectionSettings; - - friend std::ostream& operator<<(std::ostream&, const UpdateClient&); -}; - - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_UPDATECLIENT_H*/ diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.cpp b/cpp/src/qpid/cluster/UpdateDataExchange.cpp deleted file mode 100644 index e5cd82e3d3..0000000000 --- a/cpp/src/qpid/cluster/UpdateDataExchange.cpp +++ /dev/null @@ -1,77 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "UpdateDataExchange.h" -#include "Cluster.h" -#include "qpid/amqp_0_10/Codecs.h" -#include "qpid/broker/Deliverable.h" -#include "qpid/broker/Message.h" -#include "qpid/log/Statement.h" -#include "qpid/management/ManagementAgent.h" -#include "qpid/types/Variant.h" - -namespace qpid { -namespace cluster { - -const std::string UpdateDataExchange::EXCHANGE_NAME("qpid.cluster-update-data"); -const std::string UpdateDataExchange::EXCHANGE_TYPE("qpid.cluster-update-data"); -const std::string UpdateDataExchange::MANAGEMENT_AGENTS_KEY("management-agents"); -const std::string UpdateDataExchange::MANAGEMENT_SCHEMAS_KEY("management-schemas"); -const std::string UpdateDataExchange::MANAGEMENT_DELETED_OBJECTS_KEY("management-deleted-objects"); - -UpdateDataExchange::UpdateDataExchange(Cluster& cluster) : - Exchange(EXCHANGE_NAME, &cluster) -{} - -void UpdateDataExchange::route(broker::Deliverable& msg, const std::string& routingKey, - const qpid::framing::FieldTable* ) -{ - std::string data = msg.getMessage().getFrames().getContent(); - if (routingKey == MANAGEMENT_AGENTS_KEY) managementAgents = data; - else if (routingKey == MANAGEMENT_SCHEMAS_KEY) managementSchemas = data; - else if (routingKey == MANAGEMENT_DELETED_OBJECTS_KEY) managementDeletedObjects = data; - else throw Exception( - QPID_MSG("Cluster update-data exchange received unknown routing-key: " - << routingKey)); -} - -void UpdateDataExchange::updateManagementAgent(management::ManagementAgent* agent) { - if (!agent) return; - - framing::Buffer buf1(const_cast<char*>(managementAgents.data()), managementAgents.size()); - agent->importAgents(buf1); - - framing::Buffer buf2(const_cast<char*>(managementSchemas.data()), managementSchemas.size()); - agent->importSchemas(buf2); - - using amqp_0_10::ListCodec; - using types::Variant; - Variant::List encoded; - ListCodec::decode(managementDeletedObjects, encoded); - management::ManagementAgent::DeletedObjectList objects; - for (Variant::List::iterator i = encoded.begin(); i != encoded.end(); ++i) { - objects.push_back(management::ManagementAgent::DeletedObject::shared_ptr( - new management::ManagementAgent::DeletedObject(*i))); - } - agent->importDeletedObjects(objects); -} - - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/UpdateDataExchange.h b/cpp/src/qpid/cluster/UpdateDataExchange.h deleted file mode 100644 index d2f6c35ad0..0000000000 --- a/cpp/src/qpid/cluster/UpdateDataExchange.h +++ /dev/null @@ -1,84 +0,0 @@ -#ifndef QPID_CLUSTER_UPDATEDATAEXCHANGE_H -#define QPID_CLUSTER_UPDATEDATAEXCHANGE_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/broker/Exchange.h" -#include "types.h" -#include <iosfwd> - -namespace qpid { - -namespace management { -class ManagementAgent; -} - -namespace cluster { -class Cluster; - -/** - * An exchange used to send data that is to large for a control - * during update. The routing key indicates the type of data. - */ -class UpdateDataExchange : public broker::Exchange -{ - public: - static const std::string EXCHANGE_NAME; - static const std::string EXCHANGE_TYPE; - static const std::string MANAGEMENT_AGENTS_KEY; - static const std::string MANAGEMENT_SCHEMAS_KEY; - static const std::string MANAGEMENT_DELETED_OBJECTS_KEY; - - UpdateDataExchange(Cluster& parent); - - void route(broker::Deliverable& msg, const std::string& routingKey, - const framing::FieldTable* args); - - // Not implemented - std::string getType() const { return EXCHANGE_TYPE; } - - bool bind(boost::shared_ptr<broker::Queue>, - const std::string&, - const qpid::framing::FieldTable*) - { return false; } - - bool unbind(boost::shared_ptr<broker::Queue>, - const std::string&, - const qpid::framing::FieldTable*) - { return false; } - - bool isBound(boost::shared_ptr<broker::Queue>, - const std::string*, - const qpid::framing::FieldTable*) - { return false; } - - void updateManagementAgent(management::ManagementAgent* agent); - - private: - std::string managementAgents; - std::string managementSchemas; - std::string managementDeletedObjects; -}; - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_UPDATEDATAEXCHANGE_H*/ diff --git a/cpp/src/qpid/cluster/UpdateExchange.cpp b/cpp/src/qpid/cluster/UpdateExchange.cpp deleted file mode 100644 index 11937f296f..0000000000 --- a/cpp/src/qpid/cluster/UpdateExchange.cpp +++ /dev/null @@ -1,47 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/broker/Message.h" -#include "UpdateExchange.h" - -namespace qpid { -namespace cluster { - -using framing::MessageTransferBody; -using framing::DeliveryProperties; - -UpdateExchange::UpdateExchange(management::Manageable* parent) - : broker::Exchange(UpdateClient::UPDATE, parent), - broker::FanOutExchange(UpdateClient::UPDATE, parent) {} - - -void UpdateExchange::setProperties(const boost::intrusive_ptr<broker::Message>& msg) { - MessageTransferBody* transfer = msg->getMethod<MessageTransferBody>(); - assert(transfer); - const DeliveryProperties* props = msg->getProperties<DeliveryProperties>(); - assert(props); - if (props->hasExchange()) - transfer->setDestination(props->getExchange()); - else - transfer->clearDestinationFlag(); -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/UpdateExchange.h b/cpp/src/qpid/cluster/UpdateExchange.h deleted file mode 100644 index 9d7d9ee5fc..0000000000 --- a/cpp/src/qpid/cluster/UpdateExchange.h +++ /dev/null @@ -1,45 +0,0 @@ -#ifndef QPID_CLUSTER_UPDATEEXCHANGE_H -#define QPID_CLUSTER_UPDATEEXCHANGE_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/cluster/UpdateClient.h" -#include "qpid/broker/FanOutExchange.h" - - -namespace qpid { -namespace cluster { - -/** - * A keyless exchange (like fanout exchange) that does not modify - * delivery-properties.exchange but copies it to the MessageTransfer. - */ -class UpdateExchange : public broker::FanOutExchange -{ - public: - UpdateExchange(management::Manageable* parent); - void setProperties(const boost::intrusive_ptr<broker::Message>&); -}; - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_UPDATEEXCHANGE_H*/ diff --git a/cpp/src/qpid/cluster/UpdateReceiver.h b/cpp/src/qpid/cluster/UpdateReceiver.h deleted file mode 100644 index 7e8ce47662..0000000000 --- a/cpp/src/qpid/cluster/UpdateReceiver.h +++ /dev/null @@ -1,45 +0,0 @@ -#ifndef QPID_CLUSTER_UPDATESTATE_H -#define QPID_CLUSTER_UPDATESTATE_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Numbering.h" -#include "qpid/broker/SemanticState.h" - -namespace qpid { -namespace cluster { - -/** - * Cluster-wide state used when receiving an update. - */ -class UpdateReceiver { - public: - /** Numbering used to identify Queue listeners as consumers */ - typedef Numbering<boost::shared_ptr<broker::SemanticState::ConsumerImpl> > ConsumerNumbering; - ConsumerNumbering consumerNumbering; - - /** Management-id for the next shadow connection */ - std::string nextShadowMgmtId; -}; -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_UPDATESTATE_H*/ diff --git a/cpp/src/qpid/cluster/WatchDogPlugin.cpp b/cpp/src/qpid/cluster/WatchDogPlugin.cpp deleted file mode 100644 index 57ba5cf2fd..0000000000 --- a/cpp/src/qpid/cluster/WatchDogPlugin.cpp +++ /dev/null @@ -1,137 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/**@file - - The watchdog plug-in will kill the qpidd broker process if it - becomes stuck for longer than a configured interval. - - If the watchdog plugin is loaded and the --watchdog-interval=N - option is set then the broker starts a watchdog process and signals - it every N/2 seconds. - - The watchdog process runs a very simple program that starts a timer - for N seconds, and resets the timer to N seconds whenever it is - signalled by the broker. If the timer ever reaches 0 the watchdog - kills the broker process (with kill -9) and exits. - - This is useful in a cluster setting because in some insttances - (e.g. while resolving an error) it's possible for a stuck process - to hang other cluster members that are waiting for it to send a - message. Using the watchdog, the stuck process is terminated and - removed fromt the cluster allowing other members to continue and - clients of the stuck process to fail over to other members. - -*/ -#include "config.h" -#include "qpid/Plugin.h" -#include "qpid/Options.h" -#include "qpid/log/Statement.h" -#include "qpid/broker/Broker.h" -#include "qpid/sys/Timer.h" -#include "qpid/sys/Fork.h" -#include <sys/types.h> -#include <sys/wait.h> -#include <signal.h> - -namespace qpid { -namespace cluster { - -using broker::Broker; - -struct Settings { - Settings() : interval(0) {} - int interval; -}; - -struct WatchDogOptions : public qpid::Options { - Settings& settings; - - WatchDogOptions(Settings& s) : settings(s) { - addOptions() - ("watchdog-interval", optValue(settings.interval, "N"), - "broker is automatically killed if it is hung for more than \ - N seconds. 0 disables watchdog."); - } -}; - -struct WatchDogTask : public sys::TimerTask { - int pid; - sys::Timer& timer; - int interval; - - WatchDogTask(int pid_, sys::Timer& t, int _interval) - : TimerTask(_interval*sys::TIME_SEC/2,"WatchDog"), pid(pid_), timer(t), interval(_interval) {} - - void fire() { - timer.add (new WatchDogTask(pid, timer, interval)); - QPID_LOG(debug, "Sending keepalive signal to watchdog"); - ::kill(pid, SIGUSR1); - } -}; - -struct WatchDogPlugin : public qpid::Plugin, public qpid::sys::Fork { - Settings settings; - WatchDogOptions options; - Broker* broker; - int watchdogPid; - - WatchDogPlugin() : options(settings), broker(0), watchdogPid(0) {} - - ~WatchDogPlugin() { - if (watchdogPid) ::kill(watchdogPid, SIGTERM); - ::waitpid(watchdogPid, 0, 0); - } - - Options* getOptions() { return &options; } - - void earlyInitialize(qpid::Plugin::Target& target) { - broker = dynamic_cast<Broker*>(&target); - if (broker && settings.interval) { - QPID_LOG(notice, "Starting watchdog process with interval of " << - settings.interval << " seconds"); - fork(); - } - } - - void initialize(Target&) {} - - protected: - - void child() { // Child of fork - const char* watchdog = ::getenv("QPID_WATCHDOG_EXEC"); // For use in tests - if (!watchdog) watchdog=QPID_LIBEXEC_DIR "/qpidd_watchdog"; - std::string interval = boost::lexical_cast<std::string>(settings.interval); - ::execl(watchdog, watchdog, interval.c_str(), NULL); - QPID_LOG(critical, "Failed to exec watchdog program " << watchdog ); - ::kill(::getppid(), SIGKILL); - exit(1); - } - - void parent(int pid) { // Parent of fork - watchdogPid = pid; - broker->getTimer().add( - new WatchDogTask(watchdogPid, broker->getTimer(), settings.interval)); - // TODO aconway 2009-08-10: to be extra safe, we could monitor - // the watchdog child and re-start it if it exits. - } -}; - -static WatchDogPlugin instance; // Static initialization. - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/management-schema.xml b/cpp/src/qpid/cluster/management-schema.xml deleted file mode 100644 index a6292e9113..0000000000 --- a/cpp/src/qpid/cluster/management-schema.xml +++ /dev/null @@ -1,61 +0,0 @@ -<schema package="org.apache.qpid.cluster"> - - <!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - --> - - <!-- Type information: - -Numeric types with "_wm" suffix are watermarked numbers. These are compound -values containing a current value, and a low and high water mark for the reporting -interval. The low and high water marks are set to the current value at the -beginning of each interval and track the minimum and maximum values of the statistic -over the interval respectively. - -Access rights for configuration elements: - -RO => Read Only -RC => Read/Create, can be set at create time only, read-only thereafter -RW => Read/Write - -If access rights are omitted for a property, they are assumed to be RO. - - --> - - <class name="Cluster"> - <property name="brokerRef" type="objId" references="Broker" access="RC" index="y" parentRef="y"/> - <property name="clusterName" type="sstr" access="RC" desc="Name of cluster this server is a member of"/> - <property name="clusterID" type="sstr" access="RO" desc="Globally unique ID (UUID) for this cluster instance"/> - <property name="memberID" type="sstr" access="RO" desc="ID of this member of the cluster"/> - <property name="publishedURL" type="sstr" access="RC" desc="URL this node advertizes itself as"/> - <property name="clusterSize" type="uint16" access="RO" desc="Number of brokers currently in the cluster"/> - <property name="status" type="sstr" access="RO" desc="Cluster node status (STALLED,ACTIVE,JOINING)"/> - <property name="members" type="lstr" access="RO" desc="List of member URLs delimited by ';'"/> - <property name="memberIDs" type="lstr" access="RO" desc="List of member IDs delimited by ';'"/> - - <method name="stopClusterNode"> - <arg name="brokerId" type="sstr" dir="I"/> - </method> - <method name="stopFullCluster"/> - - </class> - - - -</schema> - diff --git a/cpp/src/qpid/cluster/qpidd_watchdog.cpp b/cpp/src/qpid/cluster/qpidd_watchdog.cpp deleted file mode 100644 index 51c5ed4b3f..0000000000 --- a/cpp/src/qpid/cluster/qpidd_watchdog.cpp +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/** @file helper executable for WatchDogPlugin.cpp */ - -#include <sys/types.h> -#include <sys/time.h> -#include <signal.h> -#include <unistd.h> -#include <stdlib.h> -#include <stdio.h> -#include <limits.h> - -long timeout; - -void killParent(int) { - ::kill(getppid(), SIGKILL); - ::fprintf(stderr, "Watchdog killed unresponsive broker, pid=%d\n", ::getppid()); - ::exit(1); -} - -void resetTimer(int) { - struct ::itimerval itval = { { 0, 0 }, { timeout, 0 } }; - if (::setitimer(ITIMER_REAL, &itval, 0) !=0) { - ::perror("Watchdog failed to set timer"); - killParent(0); - ::exit(1); - } -} - -/** Simple watchdog program: kill parent process if timeout - * expires without a SIGUSR1. - * Will be killed with SIGHUP when parent shuts down. - * Args: timeout in seconds. - */ -int main(int argc, char** argv) { - if(argc != 2 || (timeout = atoi(argv[1])) == 0) { - ::fprintf(stderr, "Usage: %s <timeout_seconds>\n", argv[0]); - ::exit(1); - } - ::signal(SIGUSR1, resetTimer); - ::signal(SIGALRM, killParent); - resetTimer(0); - while (true) { sleep(INT_MAX); } -} diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h deleted file mode 100644 index bfb4fd5b9e..0000000000 --- a/cpp/src/qpid/cluster/types.h +++ /dev/null @@ -1,84 +0,0 @@ -#ifndef QPID_CLUSTER_TYPES_H -#define QPID_CLUSTER_TYPES_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "config.h" -#include "qpid/Url.h" -#include "qpid/RefCounted.h" -#include "qpid/sys/IntegerTypes.h" -#include <boost/intrusive_ptr.hpp> -#include <utility> -#include <iosfwd> -#include <string> - -extern "C" { -#if defined (HAVE_OPENAIS_CPG_H) -# include <openais/cpg.h> -#elif defined (HAVE_COROSYNC_CPG_H) -# include <corosync/cpg.h> -#else -# error "No cpg.h header file available" -#endif -} - -namespace qpid { -namespace cluster { - -class Connection; -typedef boost::intrusive_ptr<Connection> ConnectionPtr; - -/** Types of cluster event. */ -enum EventType { DATA, CONTROL }; - -/** first=node-id, second=pid */ -struct MemberId : std::pair<uint32_t, uint32_t> { - MemberId(uint64_t n=0) : std::pair<uint32_t,uint32_t>( n >> 32, n & 0xffffffff) {} - MemberId(uint32_t node, uint32_t pid) : std::pair<uint32_t,uint32_t>(node, pid) {} - MemberId(const cpg_address& caddr) : std::pair<uint32_t,uint32_t>(caddr.nodeid, caddr.pid) {} - MemberId(const std::string&); // Decode from string. - uint32_t getNode() const { return first; } - uint32_t getPid() const { return second; } - operator uint64_t() const { return (uint64_t(first)<<32ull) + second; } - - // MemberId as byte string, network byte order. Not human readable. - std::string str() const; -}; - -inline bool operator==(const cpg_address& caddr, const MemberId& id) { return id == MemberId(caddr); } - -std::ostream& operator<<(std::ostream&, const MemberId&); - -struct ConnectionId : public std::pair<MemberId, uint64_t> { - ConnectionId(const MemberId& m=MemberId(), uint64_t c=0) : std::pair<MemberId, uint64_t> (m,c) {} - ConnectionId(uint64_t m, uint64_t c) : std::pair<MemberId, uint64_t>(MemberId(m), c) {} - MemberId getMember() const { return first; } - uint64_t getNumber() const { return second; } -}; - -std::ostream& operator<<(std::ostream&, const ConnectionId&); - -std::ostream& operator<<(std::ostream&, EventType); - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_TYPES_H*/ |