diff options
| author | Alan Conway <aconway@apache.org> | 2012-12-19 21:23:01 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-12-19 21:23:01 +0000 |
| commit | 1696ae8b9125fe690a081fb2ada5f1d383edbd7b (patch) | |
| tree | b23f6594c9764b7e17df8d5578e5c553964a6bab /qpid/cpp/src | |
| parent | d03de2c595e74f101b63b89d1d5a99a22605157f (diff) | |
| download | qpid-python-1696ae8b9125fe690a081fb2ada5f1d383edbd7b.tar.gz | |
QPID-4514: Remove obsolete cluster code: DtxManager, more Broker, Connection, Link.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1424126 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 10 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 19 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionState.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/DtxManager.h | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 22 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Link.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 3 |
9 files changed, 2 insertions, 67 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index c1d26f2f5e..e5ab970a55 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -216,8 +216,7 @@ Broker::Broker(const Broker::Options& conf) : queueCleaner(queues, &timer), recoveryInProgress(false), expiryPolicy(new ExpiryPolicy), - getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)), - deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2)) + getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { try { if (conf.enableMgmt) { diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index bc35504372..d2b946f71b 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -158,7 +158,6 @@ class Broker : public sys::Runnable, public Plugin::Target, const ConnectionState* context); boost::shared_ptr<sys::Poller> poller; sys::Timer timer; - std::auto_ptr<sys::Timer> clusterTimer; Options config; std::auto_ptr<management::ManagementAgent> managementAgent; ProtocolFactoryMap protocolFactories; @@ -284,15 +283,6 @@ class Broker : public sys::Runnable, public Plugin::Target, management::ManagementAgent* getManagementAgent() { return managementAgent.get(); } - /** - * Never true in a stand-alone broker. In a cluster, return true - * to defer delivery of messages deliveredg in a cluster-unsafe - * context. - *@return true if delivery of a message should be deferred. - */ - boost::function<bool (const std::string& queue, - const Message& msg)> deferDelivery; - bool isAuthenticating ( ) { return config.auth; } bool isTimestamping() { return config.timestampRcvMsgs; } diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index add93c9c8f..ffc5bd413a 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -127,8 +127,6 @@ void Connection::requestIOProcessing(boost::function0<void> callback) Connection::~Connection() { if (mgmtObject != 0) { - // In a cluster, Connections destroyed during shutdown are in - // a cluster-unsafe context. Don't raise an event in that case. if (!link) agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId(), mgmtObject->get_remoteProperties())); QPID_LOG_CAT(debug, model, "Delete connection. user:" << ConnectionState::getUserId() @@ -176,7 +174,6 @@ bool isMessage(const AMQMethodBody* method) void Connection::recordFromServer(const framing::AMQFrame& frame) { - // Don't record management stats in cluster-unsafe contexts if (mgmtObject != 0) { qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics(); @@ -191,7 +188,6 @@ void Connection::recordFromServer(const framing::AMQFrame& frame) void Connection::recordFromClient(const framing::AMQFrame& frame) { - // Don't record management stats in cluster-unsafe contexts if (mgmtObject != 0) { qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics(); @@ -294,19 +290,6 @@ void Connection::close(connection::CloseCode code, const string& text) getOutput().close(); } -// Send a close to the client but keep the channels. Used by cluster. -void Connection::sendClose() { - if (heartbeatTimer) - heartbeatTimer->cancel(); - if (timeoutTimer) - timeoutTimer->cancel(); - if (linkHeartbeatTimer) { - linkHeartbeatTimer->cancel(); - } - adapter.close(connection::CLOSE_CODE_NORMAL, "OK"); - getOutput().close(); -} - void Connection::idleOut(){} void Connection::idleIn(){} @@ -331,8 +314,6 @@ void Connection::closed(){ // Physically closed, suspend open sessions. void Connection::doIoCallbacks() { if (!isOpen()) return; // Don't process IO callbacks until we are open. ScopedLock<Mutex> l(ioCallbackLock); - // Although IO callbacks execute in the connection thread context, they are - // not cluster safe because they are queued for execution in non-IO threads. while (!ioCallbacks.empty()) { boost::function0<void> cb = ioCallbacks.front(); ioCallbacks.pop(); diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index 4bc8131f20..e418dd29bd 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -147,7 +147,6 @@ class Connection : public sys::ConnectionInputHandler, f(*ptr_map_ptr(i)); } - void sendClose(); void setSecureConnection(SecureConnection* secured); /** True if this connection is authenticated */ diff --git a/qpid/cpp/src/qpid/broker/ConnectionState.h b/qpid/cpp/src/qpid/broker/ConnectionState.h index 5ca037a0e9..331d3dcad8 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionState.h +++ b/qpid/cpp/src/qpid/broker/ConnectionState.h @@ -46,7 +46,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable framemax(65535), heartbeat(0), heartbeatmax(120), - userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links & clustering) + userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links) federationLink(true), isDefaultRealm(false) {} diff --git a/qpid/cpp/src/qpid/broker/DtxManager.h b/qpid/cpp/src/qpid/broker/DtxManager.h index 6f03189f66..8f76790720 100644 --- a/qpid/cpp/src/qpid/broker/DtxManager.h +++ b/qpid/cpp/src/qpid/broker/DtxManager.h @@ -68,11 +68,6 @@ public: void setStore(TransactionalStore* store); void setTimer(sys::Timer& t) { timer = &t; } - // Used by cluster for replication. - template<class F> void each(F f) const { - for (WorkMap::const_iterator i = work.begin(); i != work.end(); ++i) - f(*ptr_map_ptr(i)); - } DtxWorkRecord* getWork(const std::string& xid); bool exists(const std::string& xid); static std::string convert(const framing::Xid& xid); diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 7a3551856b..4bc3c01271 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -763,28 +763,6 @@ namespace { const std::string FAILOVER_INDEX("failover-index"); } -void Link::getState(framing::FieldTable& state) const -{ - state.clear(); - Mutex::ScopedLock mutex(lock); - if (!url.empty()) { - state.setString(FAILOVER_ADDRESSES, url.str()); - state.setInt(FAILOVER_INDEX, reconnectNext); - } -} - -void Link::setState(const framing::FieldTable& state) -{ - Mutex::ScopedLock mutex(lock); - if (state.isSet(FAILOVER_ADDRESSES)) { - Url failovers(state.getAsString(FAILOVER_ADDRESSES)); - setUrl(failovers); - } - if (state.isSet(FAILOVER_INDEX)) { - reconnectNext = state.getAsInt(FAILOVER_INDEX); - } -} - std::string Link::createName(const std::string& transport, const std::string& host, uint16_t port) diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index c8cd710f38..3cca4e1bb3 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -189,10 +189,6 @@ class Link : public PersistableConfig, public management::Manageable { static const std::string exchangeTypeName; static boost::shared_ptr<Exchange> linkExchangeFactory(const std::string& name); - // replicate internal state of this Link for clustering - void getState(framing::FieldTable& state) const; - void setState(const framing::FieldTable& state); - /** create a name for a link (if none supplied by user config) */ static std::string createName(const std::string& transport, const std::string& host, diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 23285dd89b..40a3949437 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -237,9 +237,6 @@ void Queue::deliver(Message msg, TxBuffer* txn){ //'link' for whatever protocol is used; that would let protocol //specific stuff be kept out the queue - // Check for deferred delivery in a cluster. - if (broker && broker->deferDelivery(name, msg)) - return; if (broker::amqp_0_10::MessageTransfer::isImmediateDeliveryRequired(msg) && getConsumerCount() == 0) { if (alternateExchange) { DeliverableMessage deliverable(msg, 0); |
