summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-12-19 21:22:50 +0000
committerAlan Conway <aconway@apache.org>2012-12-19 21:22:50 +0000
commit5ba391c494eee906fe7023c211c2eb09d1ceffde (patch)
tree3890cc4bb71f2dd1cc6cf84a0f62fc056181de3c /cpp
parentb2a64a63bd87a6bd2f991e646804a802c34c28a0 (diff)
downloadqpid-python-5ba391c494eee906fe7023c211c2eb09d1ceffde.tar.gz
QPID-4514: Remove obsolete cluster code: Broker, Connection, Link.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1424125 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp20
-rw-r--r--cpp/src/qpid/broker/Broker.h16
-rw-r--r--cpp/src/qpid/broker/Connection.cpp25
-rw-r--r--cpp/src/qpid/broker/Connection.h17
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp2
-rw-r--r--cpp/src/qpid/broker/Link.cpp47
-rw-r--r--cpp/src/qpid/broker/Link.h1
-rw-r--r--cpp/src/qpid/broker/QueueFlowLimit.cpp5
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp2
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp4
-rw-r--r--cpp/src/qpid/broker/SessionState.h2
11 files changed, 27 insertions, 114 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 3a5cbb2e41..c1d26f2f5e 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -215,9 +215,6 @@ Broker::Broker(const Broker::Options& conf) :
*this),
queueCleaner(queues, &timer),
recoveryInProgress(false),
- recovery(true),
- inCluster(false),
- clusterUpdatee(false),
expiryPolicy(new ExpiryPolicy),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)),
deferDelivery(boost::bind(&Broker::deferDeliveryImpl, this, _1, _2))
@@ -289,18 +286,11 @@ Broker::Broker(const Broker::Options& conf) :
exchanges.declare(empty, DirectExchange::typeName, false, noReplicateArgs());
if (store.get() != 0) {
- // The cluster plug-in will setRecovery(false) on all but the first
- // broker to join a cluster.
- if (getRecovery()) {
- RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager, protocolRegistry);
- recoveryInProgress = true;
- store->recover(recoverer);
- recoveryInProgress = false;
- }
- else {
- QPID_LOG(notice, "Cluster recovery: recovered journal data discarded and journal files pushed down");
- store->truncateInit(true); // save old files in subdir
- }
+ RecoveryManagerImpl recoverer(
+ queues, exchanges, links, dtxManager, protocolRegistry);
+ recoveryInProgress = true;
+ store->recover(recoverer);
+ recoveryInProgress = false;
}
//ensure standard exchanges exist (done after recovery from store)
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index c6cdc458af..bc35504372 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -184,8 +184,6 @@ class Broker : public sys::Runnable, public Plugin::Target,
const Message& msg);
std::string federationTag;
bool recoveryInProgress;
- bool recovery;
- bool inCluster, clusterUpdatee;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConsumerFactories consumerFactories;
ProtocolRegistry protocolRegistry;
@@ -282,22 +280,8 @@ class Broker : public sys::Runnable, public Plugin::Target,
static QPID_BROKER_EXTERN const std::string TCP_TRANSPORT;
- void setRecovery(bool set) { recovery = set; }
- bool getRecovery() const { return recovery; }
bool inRecovery() const { return recoveryInProgress; }
- /** True of this broker is part of a cluster.
- * Only valid after early initialization of plugins is complete.
- */
- bool isInCluster() const { return inCluster; }
- void setInCluster(bool set) { inCluster = set; }
-
- /** True if this broker is joining a cluster and in the process of
- * receiving a state update.
- */
- bool isClusterUpdatee() const { return clusterUpdatee; }
- void setClusterUpdatee(bool set) { clusterUpdatee = set; }
-
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
/**
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 3cb30a82e3..add93c9c8f 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -85,13 +85,10 @@ Connection::Connection(ConnectionOutputHandler* out_,
const qpid::sys::SecuritySettings& external,
bool link_,
uint64_t objectId_,
- bool shadow_,
- bool delayManagement,
bool authenticated_
) :
ConnectionState(out_, broker_),
securitySettings(external),
- shadow(shadow_),
authenticated(authenticated_),
adapter(*this, link_),
link(link_),
@@ -106,11 +103,6 @@ Connection::Connection(ConnectionOutputHandler* out_,
{
outboundTracker.wrap(out);
broker.getConnectionObservers().connection(*this);
- // In a cluster, allow adding the management object to be delayed.
- if (!delayManagement) addManagementObject();
-}
-
-void Connection::addManagementObject() {
assert(agent == 0);
assert(mgmtObject == 0);
Manageable* parent = broker.GetVhostObject();
@@ -119,7 +111,6 @@ void Connection::addManagementObject() {
if (agent != 0) {
// TODO set last bool true if system connection
mgmtObject = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, mgmtId, !link, false, "AMQP 0-10"));
- mgmtObject->set_shadow(shadow);
agent->addObject(mgmtObject, objectId);
}
ConnectionState::setUrl(mgmtId);
@@ -277,20 +268,6 @@ void Connection::notifyConnectionForced(const string& text)
void Connection::setUserId(const string& userId)
{
ConnectionState::setUserId(userId);
- // In a cluster, the cluster code will raise the connect event
- // when the connection is replicated to the cluster.
- if (!broker.isInCluster()) raiseConnectEvent();
-}
-
-void Connection::raiseConnectEvent() {
- if (mgmtObject != 0) {
- mgmtObject->set_authIdentity(userId);
- agent->raiseEvent(_qmf::EventClientConnect(mgmtId, userId, mgmtObject->get_remoteProperties()));
- }
-
- QPID_LOG_CAT(debug, model, "Create connection. user:" << userId
- << " rhost:" << mgmtId );
-
}
void Connection::setUserProxyAuth(bool b)
@@ -488,7 +465,7 @@ void Connection::abort()
void Connection::setHeartbeatInterval(uint16_t heartbeat)
{
setHeartbeat(heartbeat);
- if (heartbeat > 0 && !isShadow()) {
+ if (heartbeat > 0) {
if (!heartbeatTimer) {
heartbeatTimer = new ConnectionHeartbeatTask(heartbeat, timer, *this);
timer.add(heartbeatTimer);
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index 2f25b0e3f9..4bc8131f20 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -84,8 +84,6 @@ class Connection : public sys::ConnectionInputHandler,
const qpid::sys::SecuritySettings&,
bool isLink = false,
uint64_t objectId = 0,
- bool shadow=false,
- bool delayManagement = false,
bool authenticated=true);
~Connection ();
@@ -130,7 +128,6 @@ class Connection : public sys::ConnectionInputHandler,
void notifyConnectionForced(const std::string& text);
void setUserId(const std::string& uid);
- void raiseConnectEvent();
// credentials for connected client
const std::string& getUserId() const { return ConnectionState::getUserId(); }
@@ -153,18 +150,9 @@ class Connection : public sys::ConnectionInputHandler,
void sendClose();
void setSecureConnection(SecureConnection* secured);
- /** True if this is a shadow connection in a cluster. */
- bool isShadow() const { return shadow; }
-
/** True if this connection is authenticated */
bool isAuthenticated() const { return authenticated; }
- // Used by cluster to update connection status
- sys::AggregateOutput& getOutputTasks() { return outputTasks; }
-
- /** Cluster delays adding management object in the constructor then calls this. */
- void addManagementObject();
-
const qpid::sys::SecuritySettings& getExternalSecuritySettings() const
{
return securitySettings;
@@ -176,9 +164,6 @@ class Connection : public sys::ConnectionInputHandler,
bool isLink() { return link; }
void startLinkHeartbeatTimeoutTask();
- // Used by cluster during catch-up, see cluster::OutputInterceptor
- void doIoCallbacks();
-
void setClientProperties(const framing::FieldTable& cp) { clientProperties = cp; }
const framing::FieldTable& getClientProperties() const { return clientProperties; }
@@ -188,7 +173,6 @@ class Connection : public sys::ConnectionInputHandler,
ChannelMap channels;
qpid::sys::SecuritySettings securitySettings;
- bool shadow;
bool authenticated;
ConnectionHandler adapter;
const bool link;
@@ -228,6 +212,7 @@ class Connection : public sys::ConnectionInputHandler,
OutboundFrameTracker outboundTracker;
void sent(const framing::AMQFrame& f);
+ void doIoCallbacks();
public:
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 9098c75f0b..bc77f53a9a 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -210,8 +210,6 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
ive = _args.get(qpidIVE);
if (ive) {
- if (broker && broker->isInCluster())
- throw framing::NotImplementedException("Cannot use Initial Value Exchanges in a cluster");
QPID_LOG(debug, "Configured exchange " << _name << " with Initial Value");
}
}
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 0c18e08cd1..7a3551856b 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -209,9 +209,6 @@ void Link::setStateLH (int newState)
state = newState;
- if (hideManagement())
- return;
-
switch (state)
{
case STATE_WAITING : mgmtObject->set_state("Waiting"); break;
@@ -237,8 +234,7 @@ void Link::startConnectionLH ()
QPID_LOG(error, "Link connection to " << host << ":" << port << " failed: "
<< e.what());
setStateLH(STATE_WAITING);
- if (!hideManagement())
- mgmtObject->set_lastError (e.what());
+ mgmtObject->set_lastError (e.what());
}
}
@@ -249,7 +245,7 @@ void Link::established(Connection* c)
addr << host << ":" << port;
QPID_LOG (info, "Inter-broker link established to " << addr.str());
- if (!hideManagement() && agent)
+ if (agent)
agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
bool isClosing = false;
{
@@ -292,7 +288,7 @@ void Link::opened() {
Mutex::ScopedLock mutex(lock);
if (!connection) return;
- if (!hideManagement() && connection->GetManagementObject()) {
+ if (connection->GetManagementObject()) {
mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId());
}
@@ -354,13 +350,11 @@ void Link::closed(int, std::string text)
connection = 0;
- if (!hideManagement()) {
- mgmtObject->set_connectionRef(qpid::management::ObjectId());
- if (state == STATE_OPERATIONAL && agent) {
- stringstream addr;
- addr << host << ":" << port;
+ mgmtObject->set_connectionRef(qpid::management::ObjectId());
+ if (state == STATE_OPERATIONAL && agent) {
+ stringstream addr;
+ addr << host << ":" << port;
agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
- }
}
for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
@@ -372,8 +366,7 @@ void Link::closed(int, std::string text)
if (state != STATE_FAILED && state != STATE_PASSIVE)
{
setStateLH(STATE_WAITING);
- if (!hideManagement())
- mgmtObject->set_lastError (text);
+ mgmtObject->set_lastError (text);
}
}
@@ -514,14 +507,13 @@ void Link::reconnectLH(const Address& a)
port = a.port;
transport = a.protocol;
- if (!hideManagement()) {
- stringstream errorString;
- errorString << "Failing over to " << a;
- mgmtObject->set_lastError(errorString.str());
- mgmtObject->set_host(host);
- mgmtObject->set_port(port);
- mgmtObject->set_transport(transport);
- }
+ stringstream errorString;
+ errorString << "Failing over to " << a;
+ mgmtObject->set_lastError(errorString.str());
+ mgmtObject->set_host(host);
+ mgmtObject->set_port(port);
+ mgmtObject->set_transport(transport);
+
startConnectionLH();
}
@@ -538,12 +530,6 @@ bool Link::tryFailoverLH() {
return false;
}
-// Management updates for a link are inconsistent in a cluster, so they are
-// suppressed.
-bool Link::hideManagement() const {
- return !mgmtObject || ( broker && broker->isInCluster());
-}
-
// Allocate channel from link free pool
framing::ChannelId Link::nextChannel()
{
@@ -585,8 +571,7 @@ void Link::notifyConnectionForced(const string text)
{
Mutex::ScopedLock mutex(lock);
setStateLH(STATE_FAILED);
- if (!hideManagement())
- mgmtObject->set_lastError(text);
+ mgmtObject->set_lastError(text);
}
void Link::setPersistenceId(uint64_t id) const
diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h
index 97511de08f..c8cd710f38 100644
--- a/cpp/src/qpid/broker/Link.h
+++ b/cpp/src/qpid/broker/Link.h
@@ -107,7 +107,6 @@ class Link : public PersistableConfig, public management::Manageable {
void destroy(); // Cleanup connection before link goes away
void ioThreadProcessing(); // Called on connection's IO thread by request
bool tryFailoverLH(); // Called during maintenance visit
- bool hideManagement() const;
void reconnectLH(const Address&); //called by LinkRegistry
// connection management (called by LinkRegistry)
diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp
index 944cc7e838..b887364d51 100644
--- a/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -129,11 +129,6 @@ void QueueFlowLimit::enqueued(const Message& msg)
}
if (flowStopped || !index.empty()) {
- // ignore flow control if we are populating the queue due to cluster replication:
- if (broker && broker->isClusterUpdatee()) {
- QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.getSequence());
- return;
- }
QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.getSequence());
msg.getPersistentContext()->getIngressCompletion().startCompleter(); // don't complete until flow resumes
bool unique;
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index 0874a6a28e..40ab40a90c 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -111,7 +111,7 @@ void SessionHandler::attachAs(const std::string& name)
// Delay creating management object till attached(). In a cluster,
// only the active link broker calls attachAs but all brokers
// receive the subsequent attached() call.
- session.reset(new SessionState(connection.getBroker(), *this, id, config, true));
+ session.reset(new SessionState(connection.getBroker(), *this, id, config));
sendAttach(false);
}
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index a1bfcd47e5..44a17de85f 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -53,14 +53,14 @@ namespace _qmf = qmf::org::apache::qpid::broker;
SessionState::SessionState(
Broker& b, SessionHandler& h, const SessionId& id,
- const SessionState::Configuration& config, bool delayManagement)
+ const SessionState::Configuration& config)
: qpid::SessionState(id, config),
broker(b), handler(&h),
semanticState(*this),
adapter(semanticState),
asyncCommandCompleter(new AsyncCommandCompleter(this))
{
- if (!delayManagement) addManagementObject();
+ addManagementObject();
attach(h);
}
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index a60e75d192..7001c80a60 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -73,7 +73,7 @@ class SessionState : public qpid::SessionState,
{
public:
SessionState(Broker&, SessionHandler&, const SessionId&,
- const SessionState::Configuration&, bool delayManagement=false);
+ const SessionState::Configuration&);
~SessionState();
bool isAttached() const { return handler; }