diff options
| author | Alan Conway <aconway@apache.org> | 2012-12-19 21:22:50 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-12-19 21:22:50 +0000 |
| commit | 5ba391c494eee906fe7023c211c2eb09d1ceffde (patch) | |
| tree | 3890cc4bb71f2dd1cc6cf84a0f62fc056181de3c /cpp | |
| parent | b2a64a63bd87a6bd2f991e646804a802c34c28a0 (diff) | |
| download | qpid-python-5ba391c494eee906fe7023c211c2eb09d1ceffde.tar.gz | |
QPID-4514: Remove obsolete cluster code: Broker, Connection, Link.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1424125 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 20 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 16 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 25 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Connection.h | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 47 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Link.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/QueueFlowLimit.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionHandler.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 2 |
11 files changed, 27 insertions, 114 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 3a5cbb2e41..c1d26f2f5e 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -215,9 +215,6 @@ Broker::Broker(const Broker::Options& conf) : *this), queueCleaner(queues, &timer), recoveryInProgress(false), - recovery(true), - inCluster(false), - clusterUpdatee(false), expiryPolicy(new ExpiryPolicy), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)), deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2)) @@ -289,18 +286,11 @@ Broker::Broker(const Broker::Options& conf) : exchanges.declare(empty, DirectExchange::typeName, false, noReplicateArgs()); if (store.get() != 0) { - // The cluster plug-in will setRecovery(false) on all but the first - // broker to join a cluster. - if (getRecovery()) { - RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, protocolRegistry); - recoveryInProgress = true; - store->recover(recoverer); - recoveryInProgress = false; - } - else { - QPID_LOG(notice, "Cluster recovery: recovered journal data discarded and journal files pushed down"); - store->truncateInit(true); // save old files in subdir - } + RecoveryManagerImpl recoverer( + queues, exchanges, links, dtxManager, protocolRegistry); + recoveryInProgress = true; + store->recover(recoverer); + recoveryInProgress = false; } //ensure standard exchanges exist (done after recovery from store) diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index c6cdc458af..bc35504372 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -184,8 +184,6 @@ class Broker : public sys::Runnable, public Plugin::Target, const Message& msg); std::string federationTag; bool recoveryInProgress; - bool recovery; - bool inCluster, clusterUpdatee; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; ConsumerFactories consumerFactories; ProtocolRegistry protocolRegistry; @@ -282,22 +280,8 @@ class Broker : public sys::Runnable, public Plugin::Target, static QPID_BROKER_EXTERN const std::string TCP_TRANSPORT; - void setRecovery(bool set) { recovery = set; } - bool getRecovery() const { return recovery; } bool inRecovery() const { return recoveryInProgress; } - /** True of this broker is part of a cluster. - * Only valid after early initialization of plugins is complete. - */ - bool isInCluster() const { return inCluster; } - void setInCluster(bool set) { inCluster = set; } - - /** True if this broker is joining a cluster and in the process of - * receiving a state update. - */ - bool isClusterUpdatee() const { return clusterUpdatee; } - void setClusterUpdatee(bool set) { clusterUpdatee = set; } - management::ManagementAgent* getManagementAgent() { return managementAgent.get(); } /** diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 3cb30a82e3..add93c9c8f 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -85,13 +85,10 @@ Connection::Connection(ConnectionOutputHandler* out_, const qpid::sys::SecuritySettings& external, bool link_, uint64_t objectId_, - bool shadow_, - bool delayManagement, bool authenticated_ ) : ConnectionState(out_, broker_), securitySettings(external), - shadow(shadow_), authenticated(authenticated_), adapter(*this, link_), link(link_), @@ -106,11 +103,6 @@ Connection::Connection(ConnectionOutputHandler* out_, { outboundTracker.wrap(out); broker.getConnectionObservers().connection(*this); - // In a cluster, allow adding the management object to be delayed. - if (!delayManagement) addManagementObject(); -} - -void Connection::addManagementObject() { assert(agent == 0); assert(mgmtObject == 0); Manageable* parent = broker.GetVhostObject(); @@ -119,7 +111,6 @@ void Connection::addManagementObject() { if (agent != 0) { // TODO set last bool true if system connection mgmtObject = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, mgmtId, !link, false, "AMQP 0-10")); - mgmtObject->set_shadow(shadow); agent->addObject(mgmtObject, objectId); } ConnectionState::setUrl(mgmtId); @@ -277,20 +268,6 @@ void Connection::notifyConnectionForced(const string& text) void Connection::setUserId(const string& userId) { ConnectionState::setUserId(userId); - // In a cluster, the cluster code will raise the connect event - // when the connection is replicated to the cluster. - if (!broker.isInCluster()) raiseConnectEvent(); -} - -void Connection::raiseConnectEvent() { - if (mgmtObject != 0) { - mgmtObject->set_authIdentity(userId); - agent->raiseEvent(_qmf::EventClientConnect(mgmtId, userId, mgmtObject->get_remoteProperties())); - } - - QPID_LOG_CAT(debug, model, "Create connection. user:" << userId - << " rhost:" << mgmtId ); - } void Connection::setUserProxyAuth(bool b) @@ -488,7 +465,7 @@ void Connection::abort() void Connection::setHeartbeatInterval(uint16_t heartbeat) { setHeartbeat(heartbeat); - if (heartbeat > 0 && !isShadow()) { + if (heartbeat > 0) { if (!heartbeatTimer) { heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this); timer.add(heartbeatTimer); diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 2f25b0e3f9..4bc8131f20 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -84,8 +84,6 @@ class Connection : public sys::ConnectionInputHandler, const qpid::sys::SecuritySettings&, bool isLink = false, uint64_t objectId = 0, - bool shadow=false, - bool delayManagement = false, bool authenticated=true); ~Connection (); @@ -130,7 +128,6 @@ class Connection : public sys::ConnectionInputHandler, void notifyConnectionForced(const std::string& text); void setUserId(const std::string& uid); - void raiseConnectEvent(); // credentials for connected client const std::string& getUserId() const { return ConnectionState::getUserId(); } @@ -153,18 +150,9 @@ class Connection : public sys::ConnectionInputHandler, void sendClose(); void setSecureConnection(SecureConnection* secured); - /** True if this is a shadow connection in a cluster. */ - bool isShadow() const { return shadow; } - /** True if this connection is authenticated */ bool isAuthenticated() const { return authenticated; } - // Used by cluster to update connection status - sys::AggregateOutput& getOutputTasks() { return outputTasks; } - - /** Cluster delays adding management object in the constructor then calls this. */ - void addManagementObject(); - const qpid::sys::SecuritySettings& getExternalSecuritySettings() const { return securitySettings; @@ -176,9 +164,6 @@ class Connection : public sys::ConnectionInputHandler, bool isLink() { return link; } void startLinkHeartbeatTimeoutTask(); - // Used by cluster during catch-up, see cluster::OutputInterceptor - void doIoCallbacks(); - void setClientProperties(const framing::FieldTable& cp) { clientProperties = cp; } const framing::FieldTable& getClientProperties() const { return clientProperties; } @@ -188,7 +173,6 @@ class Connection : public sys::ConnectionInputHandler, ChannelMap channels; qpid::sys::SecuritySettings securitySettings; - bool shadow; bool authenticated; ConnectionHandler adapter; const bool link; @@ -228,6 +212,7 @@ class Connection : public sys::ConnectionInputHandler, OutboundFrameTracker outboundTracker; void sent(const framing::AMQFrame& f); + void doIoCallbacks(); public: diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 9098c75f0b..bc77f53a9a 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -210,8 +210,6 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel ive = _args.get(qpidIVE); if (ive) { - if (broker && broker->isInCluster()) - throw framing::NotImplementedException("Cannot use Initial Value Exchanges in a cluster"); QPID_LOG(debug, "Configured exchange " << _name << " with Initial Value"); } } diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 0c18e08cd1..7a3551856b 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -209,9 +209,6 @@ void Link::setStateLH (int newState) state = newState; - if (hideManagement()) - return; - switch (state) { case STATE_WAITING : mgmtObject->set_state("Waiting"); break; @@ -237,8 +234,7 @@ void Link::startConnectionLH () QPID_LOG(error, "Link connection to " << host << ":" << port << " failed: " << e.what()); setStateLH(STATE_WAITING); - if (!hideManagement()) - mgmtObject->set_lastError (e.what()); + mgmtObject->set_lastError (e.what()); } } @@ -249,7 +245,7 @@ void Link::established(Connection* c) addr << host << ":" << port; QPID_LOG (info, "Inter-broker link established to " << addr.str()); - if (!hideManagement() && agent) + if (agent) agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str())); bool isClosing = false; { @@ -292,7 +288,7 @@ void Link::opened() { Mutex::ScopedLock mutex(lock); if (!connection) return; - if (!hideManagement() && connection->GetManagementObject()) { + if (connection->GetManagementObject()) { mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId()); } @@ -354,13 +350,11 @@ void Link::closed(int, std::string text) connection = 0; - if (!hideManagement()) { - mgmtObject->set_connectionRef(qpid::management::ObjectId()); - if (state == STATE_OPERATIONAL && agent) { - stringstream addr; - addr << host << ":" << port; + mgmtObject->set_connectionRef(qpid::management::ObjectId()); + if (state == STATE_OPERATIONAL && agent) { + stringstream addr; + addr << host << ":" << port; agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); - } } for (Bridges::iterator i = active.begin(); i != active.end(); i++) { @@ -372,8 +366,7 @@ void Link::closed(int, std::string text) if (state != STATE_FAILED && state != STATE_PASSIVE) { setStateLH(STATE_WAITING); - if (!hideManagement()) - mgmtObject->set_lastError (text); + mgmtObject->set_lastError (text); } } @@ -514,14 +507,13 @@ void Link::reconnectLH(const Address& a) port = a.port; transport = a.protocol; - if (!hideManagement()) { - stringstream errorString; - errorString << "Failing over to " << a; - mgmtObject->set_lastError(errorString.str()); - mgmtObject->set_host(host); - mgmtObject->set_port(port); - mgmtObject->set_transport(transport); - } + stringstream errorString; + errorString << "Failing over to " << a; + mgmtObject->set_lastError(errorString.str()); + mgmtObject->set_host(host); + mgmtObject->set_port(port); + mgmtObject->set_transport(transport); + startConnectionLH(); } @@ -538,12 +530,6 @@ bool Link::tryFailoverLH() { return false; } -// Management updates for a link are inconsistent in a cluster, so they are -// suppressed. -bool Link::hideManagement() const { - return !mgmtObject || ( broker && broker->isInCluster()); -} - // Allocate channel from link free pool framing::ChannelId Link::nextChannel() { @@ -585,8 +571,7 @@ void Link::notifyConnectionForced(const string text) { Mutex::ScopedLock mutex(lock); setStateLH(STATE_FAILED); - if (!hideManagement()) - mgmtObject->set_lastError(text); + mgmtObject->set_lastError(text); } void Link::setPersistenceId(uint64_t id) const diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h index 97511de08f..c8cd710f38 100644 --- a/cpp/src/qpid/broker/Link.h +++ b/cpp/src/qpid/broker/Link.h @@ -107,7 +107,6 @@ class Link : public PersistableConfig, public management::Manageable { void destroy(); // Cleanup connection before link goes away void ioThreadProcessing(); // Called on connection's IO thread by request bool tryFailoverLH(); // Called during maintenance visit - bool hideManagement() const; void reconnectLH(const Address&); //called by LinkRegistry // connection management (called by LinkRegistry) diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp index 944cc7e838..b887364d51 100644 --- a/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -129,11 +129,6 @@ void QueueFlowLimit::enqueued(const Message& msg) } if (flowStopped || !index.empty()) { - // ignore flow control if we are populating the queue due to cluster replication: - if (broker && broker->isClusterUpdatee()) { - QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.getSequence()); - return; - } QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.getSequence()); msg.getPersistentContext()->getIngressCompletion().startCompleter(); // don't complete until flow resumes bool unique; diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp index 0874a6a28e..40ab40a90c 100644 --- a/cpp/src/qpid/broker/SessionHandler.cpp +++ b/cpp/src/qpid/broker/SessionHandler.cpp @@ -111,7 +111,7 @@ void SessionHandler::attachAs(const std::string& name) // Delay creating management object till attached(). In a cluster, // only the active link broker calls attachAs but all brokers // receive the subsequent attached() call. - session.reset(new SessionState(connection.getBroker(), *this, id, config, true)); + session.reset(new SessionState(connection.getBroker(), *this, id, config)); sendAttach(false); } diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index a1bfcd47e5..44a17de85f 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -53,14 +53,14 @@ namespace _qmf = qmf::org::apache::qpid::broker; SessionState::SessionState( Broker& b, SessionHandler& h, const SessionId& id, - const SessionState::Configuration& config, bool delayManagement) + const SessionState::Configuration& config) : qpid::SessionState(id, config), broker(b), handler(&h), semanticState(*this), adapter(semanticState), asyncCommandCompleter(new AsyncCommandCompleter(this)) { - if (!delayManagement) addManagementObject(); + addManagementObject(); attach(h); } diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index a60e75d192..7001c80a60 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -73,7 +73,7 @@ class SessionState : public qpid::SessionState, { public: SessionState(Broker&, SessionHandler&, const SessionId&, - const SessionState::Configuration&, bool delayManagement=false); + const SessionState::Configuration&); ~SessionState(); bool isAttached() const { return handler; } |
