summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-12-19 21:23:01 +0000
committerAlan Conway <aconway@apache.org>2012-12-19 21:23:01 +0000
commit1696ae8b9125fe690a081fb2ada5f1d383edbd7b (patch)
treeb23f6594c9764b7e17df8d5578e5c553964a6bab /qpid/cpp/src
parentd03de2c595e74f101b63b89d1d5a99a22605157f (diff)
downloadqpid-python-1696ae8b9125fe690a081fb2ada5f1d383edbd7b.tar.gz
QPID-4514: Remove obsolete cluster code: DtxManager, more Broker, Connection, Link.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1424126 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h10
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp19
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h1
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionState.h2
-rw-r--r--qpid/cpp/src/qpid/broker/DtxManager.h5
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp22
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp3
9 files changed, 2 insertions, 67 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index c1d26f2f5e..e5ab970a55 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -216,8 +216,7 @@ Broker::Broker(const Broker::Options& conf) :
queueCleaner(queues, &timer),
recoveryInProgress(false),
expiryPolicy(new ExpiryPolicy),
- getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)),
- deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2))
+ getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
try {
if (conf.enableMgmt) {
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index bc35504372..d2b946f71b 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -158,7 +158,6 @@ class Broker : public sys::Runnable, public Plugin::Target,
const ConnectionState* context);
boost::shared_ptr<sys::Poller> poller;
sys::Timer timer;
- std::auto_ptr<sys::Timer> clusterTimer;
Options config;
std::auto_ptr<management::ManagementAgent> managementAgent;
ProtocolFactoryMap protocolFactories;
@@ -284,15 +283,6 @@ class Broker : public sys::Runnable, public Plugin::Target,
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
- /**
- * Never true in a stand-alone broker. In a cluster, return true
- * to defer delivery of messages deliveredg in a cluster-unsafe
- * context.
- *@return true if delivery of a message should be deferred.
- */
- boost::function<bool (const std::string& queue,
- const Message& msg)> deferDelivery;
-
bool isAuthenticating ( ) { return config.auth; }
bool isTimestamping() { return config.timestampRcvMsgs; }
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index add93c9c8f..ffc5bd413a 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -127,8 +127,6 @@ void Connection::requestIOProcessing(boost::function0<void> callback)
Connection::~Connection()
{
if (mgmtObject != 0) {
- // In a cluster, Connections destroyed during shutdown are in
- // a cluster-unsafe context. Don't raise an event in that case.
if (!link)
agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId(), mgmtObject->get_remoteProperties()));
QPID_LOG_CAT(debug, model, "Delete connection. user:" << ConnectionState::getUserId()
@@ -176,7 +174,6 @@ bool isMessage(const AMQMethodBody* method)
void Connection::recordFromServer(const framing::AMQFrame& frame)
{
- // Don't record management stats in cluster-unsafe contexts
if (mgmtObject != 0)
{
qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics();
@@ -191,7 +188,6 @@ void Connection::recordFromServer(const framing::AMQFrame& frame)
void Connection::recordFromClient(const framing::AMQFrame& frame)
{
- // Don't record management stats in cluster-unsafe contexts
if (mgmtObject != 0)
{
qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = mgmtObject->getStatistics();
@@ -294,19 +290,6 @@ void Connection::close(connection::CloseCode code, const string& text)
getOutput().close();
}
-// Send a close to the client but keep the channels. Used by cluster.
-void Connection::sendClose() {
- if (heartbeatTimer)
- heartbeatTimer->cancel();
- if (timeoutTimer)
- timeoutTimer->cancel();
- if (linkHeartbeatTimer) {
- linkHeartbeatTimer->cancel();
- }
- adapter.close(connection::CLOSE_CODE_NORMAL, "OK");
- getOutput().close();
-}
-
void Connection::idleOut(){}
void Connection::idleIn(){}
@@ -331,8 +314,6 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
void Connection::doIoCallbacks() {
if (!isOpen()) return; // Don't process IO callbacks until we are open.
ScopedLock<Mutex> l(ioCallbackLock);
- // Although IO callbacks execute in the connection thread context, they are
- // not cluster safe because they are queued for execution in non-IO threads.
while (!ioCallbacks.empty()) {
boost::function0<void> cb = ioCallbacks.front();
ioCallbacks.pop();
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 4bc8131f20..e418dd29bd 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -147,7 +147,6 @@ class Connection : public sys::ConnectionInputHandler,
f(*ptr_map_ptr(i));
}
- void sendClose();
void setSecureConnection(SecureConnection* secured);
/** True if this connection is authenticated */
diff --git a/qpid/cpp/src/qpid/broker/ConnectionState.h b/qpid/cpp/src/qpid/broker/ConnectionState.h
index 5ca037a0e9..331d3dcad8 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionState.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionState.h
@@ -46,7 +46,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
framemax(65535),
heartbeat(0),
heartbeatmax(120),
- userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links & clustering)
+ userProxyAuth(false), // Can proxy msgs with non-matching auth ids when true (used by federation links)
federationLink(true),
isDefaultRealm(false)
{}
diff --git a/qpid/cpp/src/qpid/broker/DtxManager.h b/qpid/cpp/src/qpid/broker/DtxManager.h
index 6f03189f66..8f76790720 100644
--- a/qpid/cpp/src/qpid/broker/DtxManager.h
+++ b/qpid/cpp/src/qpid/broker/DtxManager.h
@@ -68,11 +68,6 @@ public:
void setStore(TransactionalStore* store);
void setTimer(sys::Timer& t) { timer = &t; }
- // Used by cluster for replication.
- template<class F> void each(F f) const {
- for (WorkMap::const_iterator i = work.begin(); i != work.end(); ++i)
- f(*ptr_map_ptr(i));
- }
DtxWorkRecord* getWork(const std::string& xid);
bool exists(const std::string& xid);
static std::string convert(const framing::Xid& xid);
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 7a3551856b..4bc3c01271 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -763,28 +763,6 @@ namespace {
const std::string FAILOVER_INDEX("failover-index");
}
-void Link::getState(framing::FieldTable& state) const
-{
- state.clear();
- Mutex::ScopedLock mutex(lock);
- if (!url.empty()) {
- state.setString(FAILOVER_ADDRESSES, url.str());
- state.setInt(FAILOVER_INDEX, reconnectNext);
- }
-}
-
-void Link::setState(const framing::FieldTable& state)
-{
- Mutex::ScopedLock mutex(lock);
- if (state.isSet(FAILOVER_ADDRESSES)) {
- Url failovers(state.getAsString(FAILOVER_ADDRESSES));
- setUrl(failovers);
- }
- if (state.isSet(FAILOVER_INDEX)) {
- reconnectNext = state.getAsInt(FAILOVER_INDEX);
- }
-}
-
std::string Link::createName(const std::string& transport,
const std::string& host,
uint16_t port)
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index c8cd710f38..3cca4e1bb3 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -189,10 +189,6 @@ class Link : public PersistableConfig, public management::Manageable {
static const std::string exchangeTypeName;
static boost::shared_ptr<Exchange> linkExchangeFactory(const std::string& name);
- // replicate internal state of this Link for clustering
- void getState(framing::FieldTable& state) const;
- void setState(const framing::FieldTable& state);
-
/** create a name for a link (if none supplied by user config) */
static std::string createName(const std::string& transport,
const std::string& host,
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 23285dd89b..40a3949437 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -237,9 +237,6 @@ void Queue::deliver(Message msg, TxBuffer* txn){
//'link' for whatever protocol is used; that would let protocol
//specific stuff be kept out the queue
- // Check for deferred delivery in a cluster.
- if (broker && broker->deferDelivery(name, msg))
- return;
if (broker::amqp_0_10::MessageTransfer::isImmediateDeliveryRequired(msg) && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg, 0);