summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-12 21:20:07 +0000
committerAlan Conway <aconway@apache.org>2012-06-12 21:20:07 +0000
commit2c26294c60daa02e19189cbbd935e2441f2c541c (patch)
treeb62b1f4dc8ce0627d9fc28782772ee9f2e503147
parentbf69fd2f69325dd660454e6b6c8399c51cacea2c (diff)
downloadqpid-python-2c26294c60daa02e19189cbbd935e2441f2c541c.tar.gz
QPID-3603: Introduced RemoteBackup to track backup status.
The primary creates RemoteBackup object for each connected or expected backup. On first being promoted, the new primary has a RemoteBackup for each of the known backups at the time of the failure. The RemoteBackup manages queue guards for its backup and tracks it's readiness. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1349540 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/CMakeLists.txt4
-rw-r--r--qpid/cpp/src/ha.mk4
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h1
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.cpp36
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.h11
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp15
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h1
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp87
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.h68
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.cpp13
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.h4
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp53
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h2
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.cpp18
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.h11
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp138
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h45
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h29
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp36
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.h21
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp81
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.h84
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp109
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h6
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicationTest.cpp17
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicationTest.h11
-rw-r--r--qpid/cpp/src/qpid/ha/UnreadyQueueSet.cpp90
-rw-r--r--qpid/cpp/src/qpid/ha/UnreadyQueueSet.h88
-rw-r--r--qpid/cpp/src/tests/brokertest.py8
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py50
32 files changed, 597 insertions, 551 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index b83e8d9c6d..8933d60104 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -656,8 +656,8 @@ if (BUILD_HA)
qpid/ha/Settings.h
qpid/ha/Types.cpp
qpid/ha/Types.h
- qpid/ha/UnreadyQueueSet.cpp
- qpid/ha/UnreadyQueueSet.h
+ qpid/ha/RemoteBackup.cpp
+ qpid/ha/RemoteBackup.h
)
add_library (ha MODULE ${ha_SOURCES})
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index dae924b6ea..2027a75706 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -51,8 +51,8 @@ ha_la_SOURCES = \
qpid/ha/ReplicationTest.cpp \
qpid/ha/ReplicationTest.h \
qpid/ha/Settings.h \
- qpid/ha/UnreadyQueueSet.cpp \
- qpid/ha/UnreadyQueueSet.h \
+ qpid/ha/RemoteBackup.cpp \
+ qpid/ha/RemoteBackup.h \
qpid/ha/types.cpp \
qpid/ha/types.h
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index f2cc2a3454..1c48d1a4f1 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -59,7 +59,8 @@ bool Backup::isSelf(const Address& a) const {
Url Backup::linkUrl(const Url& brokers) const {
return brokers;
/** FIXME aconway 2012-05-29: Problems with self-test, false positives.
- // linkUrl contains only the addresses of *other* brokers, not this one.
+ // linkUrl contains only the addresses of *
+ other* brokers, not this one.
Url url;
for (Url::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
if (!isSelf(*i)) url.push_back(*i);
diff --git a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
index 9776fac016..6fd3a3ae09 100644
--- a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
+++ b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
@@ -27,6 +27,7 @@
namespace qpid {
namespace ha {
+// FIXME aconway 2012-06-06: move to Backup.cpp
/**
* Exclude connections to a backup broker.
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
index 0c5de9542f..91d497ab43 100644
--- a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
@@ -26,7 +26,8 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
#include <iostream>
-
+#include <iterator>
+#include <sstream>
namespace qpid {
namespace ha {
@@ -43,7 +44,16 @@ using types::Variant;
using framing::FieldTable;
BrokerInfo::BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id) :
- logId(id.str().substr(0,9)+"..."), hostName(host), port(port_), systemId(id) {}
+ hostName(host), port(port_), systemId(id)
+{
+ updateLogId();
+}
+
+void BrokerInfo::updateLogId() {
+ std::ostringstream o;
+ o << hostName << ":" << port;
+ logId = o.str();
+}
FieldTable BrokerInfo::asFieldTable() const {
Variant::Map m = asMap();
@@ -81,11 +91,29 @@ void BrokerInfo::assign(const Variant::Map& m) {
hostName = get(m, HOST_NAME).asString();
port = get(m, PORT).asUint16();
status = BrokerStatus(get(m, STATUS).asUint8());
+ updateLogId();
}
std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) {
- return o << b.getHostName() << ":" << b.getPort() << "(" << b.getSystemId()
- << "," << printable(b.getStatus()) << ")";
+ return o << b.getHostName() << ":" << b.getPort() << "("
+ << printable(b.getStatus()) << ")";
+ // FIXME aconway 2012-06-06: include << b.getSystemId()?
+}
+
+std::ostream& operator<<(std::ostream& o, const BrokerInfo::Set& infos) {
+ std::ostream_iterator<BrokerInfo> out(o, " ");
+ copy(infos.begin(), infos.end(), out);
+ return o;
+}
+
+std::ostream& operator<<(std::ostream& o, const BrokerInfo::Map::value_type& v) {
+ return o << v.second;
+}
+
+std::ostream& operator<<(std::ostream& o, const BrokerInfo::Map& infos) {
+ std::ostream_iterator<BrokerInfo::Map::value_type> out(o, " ");
+ copy(infos.begin(), infos.end(), out);
+ return o;
}
}}
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.h b/qpid/cpp/src/qpid/ha/BrokerInfo.h
index 55479df4b9..642f7c1361 100644
--- a/qpid/cpp/src/qpid/ha/BrokerInfo.h
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.h
@@ -29,6 +29,7 @@
#include "qpid/types/Variant.h"
#include <string>
#include <iosfwd>
+#include <vector>
namespace qpid {
namespace ha {
@@ -39,6 +40,9 @@ namespace ha {
class BrokerInfo
{
public:
+ typedef std::set<BrokerInfo> Set;
+ typedef std::map<types::Uuid, BrokerInfo> Map;
+
BrokerInfo() {}
BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id);
BrokerInfo(const framing::FieldTable& ft) { assign(ft); }
@@ -58,7 +62,11 @@ class BrokerInfo
void assign(const framing::FieldTable&);
void assign(const types::Variant::Map&);
+ // So it can be put in a set.
+ bool operator<(const BrokerInfo x) const { return systemId < x.systemId; }
+
private:
+ void updateLogId();
std::string logId;
std::string hostName;
uint16_t port;
@@ -67,6 +75,9 @@ class BrokerInfo
};
std::ostream& operator<<(std::ostream&, const BrokerInfo&);
+std::ostream& operator<<(std::ostream&, const BrokerInfo::Set&);
+std::ostream& operator<<(std::ostream&, const BrokerInfo::Map::value_type&);
+std::ostream& operator<<(std::ostream&, const BrokerInfo::Map&);
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 81667e0437..cbfd5b1f32 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -13,7 +13,7 @@
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
+ * KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
@@ -292,7 +292,10 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
string name = values[QNAME].asString();
Variant::Map argsMap = asMapVoid(values[ARGS]);
if (!replicationTest.isReplicated(
- values[ARGS].asMap(), values[AUTODEL].asBool(), values[EXCL].asBool()))
+ CONFIGURATION,
+ values[ARGS].asMap(),
+ values[AUTODEL].asBool(),
+ values[EXCL].asBool()))
return;
if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
framing::FieldTable args;
@@ -441,9 +444,11 @@ void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) {
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
- if (!replicationTest.isReplicated(values[ARGUMENTS].asMap(),
- values[AUTODELETE].asBool(),
- values[EXCLUSIVE].asBool()))
+ if (!replicationTest.isReplicated(
+ CONFIGURATION,
+ values[ARGUMENTS].asMap(),
+ values[AUTODELETE].asBool(),
+ values[EXCLUSIVE].asBool()))
return;
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index 677de31370..9788e4b647 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -94,7 +94,6 @@ class BrokerReplicator : public broker::Exchange,
QueueReplicatorPtr findQueueReplicator(const std::string& qname);
void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
- bool isReplicated(const types::Variant::Map& args, bool autodelete, bool exclusive);
void ready();
std::string logPrefix;
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
deleted file mode 100644
index 61835b15d1..0000000000
--- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ConnectionExcluder.h"
-#include "BrokerInfo.h"
-#include "HaBroker.h"
-#include "qpid/framing/FieldTable.h"
-#include "qpid/broker/Connection.h"
-#include <boost/function.hpp>
-#include <sstream>
-
-namespace qpid {
-namespace ha {
-
-ConnectionExcluder::ConnectionExcluder(HaBroker& hb, const types::Uuid& uuid)
- : haBroker(hb), logPrefix("HA: "), self(uuid) {}
-
-namespace {
-bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info) {
- framing::FieldTable ft;
- if (connection.getClientProperties().getTable(ConnectionExcluder::BACKUP_TAG, ft)) {
- info = BrokerInfo(ft);
- return true;
- }
- return false;
-}
-}
-
-void ConnectionExcluder::opened(broker::Connection& connection) {
- if (connection.isLink()) return; // Allow all outgoing links
- if (connection.getClientProperties().isSet(ADMIN_TAG)) {
- QPID_LOG(debug, logPrefix << "Allowing admin connection: "
- << connection.getMgmtId());
- return;
- }
- BrokerStatus status = haBroker.getStatus();
- if (isBackup(status)) reject(connection);
- BrokerInfo info; // Avoid self connections.
- if (getBrokerInfo(connection, info)) {
- if (info.getSystemId() == self) {
- QPID_LOG(debug, logPrefix << "Rejected self connection");
- reject(connection);
- }
- else {
- QPID_LOG(debug, logPrefix << "Allowed backup connection " << info);
- haBroker.getMembership().add(info);
- return;
- }
- }
- // else: Primary node accepts connections.
-}
-
-void ConnectionExcluder::reject(broker::Connection& connection) {
- throw Exception(
- QPID_MSG(logPrefix << "Rejected connection " << connection.getMgmtId()));
-}
-
-void ConnectionExcluder::closed(broker::Connection& connection) {
- BrokerInfo info;
- BrokerStatus status = haBroker.getStatus();
- if (isBackup(status)) return; // Don't mess with the map received from primary.
- if (getBrokerInfo(connection, info))
- haBroker.getMembership().remove(info.getSystemId());
-}
-
-const std::string ConnectionExcluder::ADMIN_TAG="qpid.ha-admin";
-const std::string ConnectionExcluder::BACKUP_TAG="qpid.ha-backup";
-
-}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
deleted file mode 100644
index c24a138e2c..0000000000
--- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
+++ /dev/null
@@ -1,68 +0,0 @@
-#ifndef QPID_HA_CONNECTIONEXCLUDER_H
-#define QPID_HA_CONNECTIONEXCLUDER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "types.h"
-#include "qpid/broker/ConnectionObserver.h"
-#include "qpid/types/Uuid.h"
-#include "qpid/sys/Mutex.h"
-#include <boost/function.hpp>
-
-namespace qpid {
-
-namespace broker {
-class Connection;
-}
-
-namespace ha {
-class HaBroker;
-
-/**
- * Exclude normal connections to a backup broker.
- * Admin connections are identified by a special flag in client-properties
- * during connection negotiation.
- */
-class ConnectionExcluder : public broker::ConnectionObserver
-{
- public:
- static const std::string ADMIN_TAG;
- static const std::string BACKUP_TAG;
-
- ConnectionExcluder(HaBroker&, const types::Uuid& self);
-
- void opened(broker::Connection& connection);
- void closed(broker::Connection& connection);
-
- void setStatus(BrokerStatus);
-
- private:
- void reject(broker::Connection&);
-
- HaBroker& haBroker;
- std::string logPrefix;
- types::Uuid self;
-};
-
-}} // namespace qpid::ha
-
-#endif /*!QPID_HA_CONNECTIONEXCLUDER_H*/
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
index 694a253fc3..d121aa1191 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
@@ -21,6 +21,7 @@
#include "ConnectionObserver.h"
#include "BrokerInfo.h"
+#include "HaBroker.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Connection.h"
#include "qpid/log/Statement.h"
@@ -28,9 +29,10 @@
namespace qpid {
namespace ha {
-ConnectionObserver::ConnectionObserver(const types::Uuid& uuid)
- : logPrefix("HA: "), self(uuid) {}
+ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid)
+ : haBroker(hb), logPrefix("HA: "), self(uuid) {}
+// FIXME aconway 2012-06-06: move to BrokerInfo
bool ConnectionObserver::getBrokerInfo(broker::Connection& connection, BrokerInfo& info) {
framing::FieldTable ft;
if (connection.getClientProperties().getTable(ConnectionObserver::BACKUP_TAG, ft)) {
@@ -58,13 +60,16 @@ void ConnectionObserver::opened(broker::Connection& connection) {
return; // No need to call observer, always allow admins.
}
BrokerInfo info; // Avoid self connections.
- if (getBrokerInfo(connection, info) && info.getSystemId() == self)
- throw Exception("HA rejected self connection");
+ if (getBrokerInfo(connection, info)) {
+ if (info.getSystemId() == self)
+ throw Exception("HA rejected self connection");
+ }
ObserverPtr o(getObserver());
if (o) o->opened(connection);
}
void ConnectionObserver::closed(broker::Connection& connection) {
+ BrokerInfo info;
ObserverPtr o(getObserver());
if (o) o->closed(connection);
}
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.h b/qpid/cpp/src/qpid/ha/ConnectionObserver.h
index a950f41739..5c1dabe8f8 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionObserver.h
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.h
@@ -31,6 +31,7 @@
namespace qpid {
namespace ha {
class BrokerInfo;
+class HaBroker;
/**
* Observes connections, delegates to another ConnectionObserver for
@@ -52,7 +53,7 @@ class ConnectionObserver : public broker::ConnectionObserver
static bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info);
- ConnectionObserver(const types::Uuid& self);
+ ConnectionObserver(HaBroker& haBroker, const types::Uuid& self);
void setObserver(const ObserverPtr&);
ObserverPtr getObserver();
@@ -62,6 +63,7 @@ class ConnectionObserver : public broker::ConnectionObserver
private:
sys::Mutex lock;
+ HaBroker& haBroker;
std::string logPrefix;
ObserverPtr observer;
types::Uuid self;
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 046800791d..cfe202c6f7 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -23,7 +23,6 @@
#include "ConnectionObserver.h"
#include "HaBroker.h"
#include "Primary.h"
-#include "PrimaryConnectionMonitor.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
#include "qpid/amqp_0_10/Codecs.h"
@@ -53,6 +52,7 @@ using namespace management;
using namespace std;
using types::Variant;
using types::Uuid;
+using sys::Mutex;
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
: logPrefix("HA: "),
@@ -61,11 +61,11 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
settings(s),
mgmtObject(0),
status(STANDALONE),
- observer(new ConnectionObserver(systemId)),
+ observer(new ConnectionObserver(*this, systemId)),
brokerInfo(broker.getSystem()->getNodeName(),
// TODO aconway 2012-05-24: other transports?
broker.getPort(broker::Broker::TCP_TRANSPORT), systemId),
- membership(systemId, boost::bind(&HaBroker::membershipUpdate, this, _1, _2)),
+ membership(systemId, boost::bind(&HaBroker::membershipUpdate, this, _1)),
replicationTest(s.replicateDefault.get())
{
// Set up the management object.
@@ -95,7 +95,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
// NOTE: lock is not needed in a constructor, but create one
// to pass to functions that have a ScopedLock parameter.
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l);
if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
statusChanged(l);
@@ -108,28 +108,26 @@ HaBroker::~HaBroker() {
broker.getConnectionObservers().remove(observer);
}
-void HaBroker::recover(sys::Mutex::ScopedLock& l) {
+void HaBroker::recover(Mutex::ScopedLock&) {
setStatus(RECOVERING);
backup.reset(); // No longer replicating, close link.
- IdSet backups = membership.otherBackups();
+ BrokerInfo::Set backups = membership.otherBackups();
membership.reset(brokerInfo);
+ // Drop the lock, new Primary may call back on activate.
+ Mutex::ScopedUnlock u(lock);
primary.reset(new Primary(*this, backups)); // Starts primary-ready check.
- observer->setObserver( // Allow connections
- boost::shared_ptr<broker::ConnectionObserver>(
- new PrimaryConnectionMonitor(*this)));
- if (primary->isActive()) activate(l);
}
// Called back from Primary active check.
void HaBroker::activate() {
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
activate(l);
}
-void HaBroker::activate(sys::Mutex::ScopedLock&) { setStatus(ACTIVE); }
+void HaBroker::activate(Mutex::ScopedLock&) { setStatus(ACTIVE); }
Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
switch (methodId) {
case _qmf::HaBroker::METHOD_PROMOTE: {
switch (status) {
@@ -186,13 +184,13 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
return Manageable::STATUS_OK;
}
-void HaBroker::setClientUrl(const Url& url, sys::Mutex::ScopedLock& l) {
+void HaBroker::setClientUrl(const Url& url, Mutex::ScopedLock& l) {
if (url.empty()) throw Exception("Invalid empty URL for HA client failover");
clientUrl = url;
updateClientUrl(l);
}
-void HaBroker::updateClientUrl(sys::Mutex::ScopedLock&) {
+void HaBroker::updateClientUrl(Mutex::ScopedLock&) {
Url url = clientUrl.empty() ? brokerUrl : clientUrl;
if (url.empty()) throw Url::Invalid("HA client URL is empty");
mgmtObject->set_publicUrl(url.str());
@@ -201,7 +199,7 @@ void HaBroker::updateClientUrl(sys::Mutex::ScopedLock&) {
QPID_LOG(debug, logPrefix << "Setting client URL to: " << url);
}
-void HaBroker::setBrokerUrl(const Url& url, sys::Mutex::ScopedLock& l) {
+void HaBroker::setBrokerUrl(const Url& url, Mutex::ScopedLock& l) {
if (url.empty()) throw Url::Invalid("HA broker URL is empty");
brokerUrl = url;
mgmtObject->set_brokersUrl(brokerUrl.str());
@@ -220,12 +218,12 @@ void HaBroker::shutdown() {
}
BrokerStatus HaBroker::getStatus() const {
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
return status;
}
void HaBroker::setStatus(BrokerStatus newStatus) {
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
setStatus(newStatus, l);
}
@@ -249,7 +247,7 @@ bool checkTransition(BrokerStatus from, BrokerStatus to) {
}
} // namespace
-void HaBroker::setStatus(BrokerStatus newStatus, sys::Mutex::ScopedLock& l) {
+void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) {
QPID_LOG(notice, logPrefix << "Status change: "
<< printable(status) << " -> " << printable(newStatus));
bool legal = checkTransition(status, newStatus);
@@ -263,22 +261,19 @@ void HaBroker::setStatus(BrokerStatus newStatus, sys::Mutex::ScopedLock& l) {
statusChanged(l);
}
-void HaBroker::statusChanged(sys::Mutex::ScopedLock& l) {
+void HaBroker::statusChanged(Mutex::ScopedLock& l) {
mgmtObject->set_status(printable(status).str());
brokerInfo.setStatus(status);
setLinkProperties(l);
}
-void HaBroker::membershipUpdate(const Variant::List& brokers, const IdSet& otherBackups)
-{
- QPID_LOG(debug, logPrefix << "Membership update: " << brokers);
+void HaBroker::membershipUpdate(const Variant::List& brokers) {
+ // No lock, only calls thread-safe objects.
mgmtObject->set_members(brokers);
broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
- sys::Mutex::ScopedLock l(lock);
- if (primary.get()) primary->getUnreadyQueueSet().setExpectedBackups(otherBackups);
}
-void HaBroker::setLinkProperties(sys::Mutex::ScopedLock&) {
+void HaBroker::setLinkProperties(Mutex::ScopedLock&) {
framing::FieldTable linkProperties = broker.getLinkClientProperties();
if (isBackup(status)) {
// If this is a backup then any links we make are backup links
@@ -296,18 +291,18 @@ void HaBroker::setLinkProperties(sys::Mutex::ScopedLock&) {
}
void HaBroker::activatedBackup(const std::string& queue) {
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
activeBackups.insert(queue);
}
void HaBroker::deactivatedBackup(const std::string& queue) {
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
activeBackups.erase(queue);
}
// FIXME aconway 2012-05-31: strip out.
HaBroker::QueueNames HaBroker::getActiveBackups() const {
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
return activeBackups;
}
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 80e8a6cc3d..d36789e565 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -95,7 +95,7 @@ class HaBroker : public management::Manageable
const BrokerInfo& getBrokerInfo() const { return brokerInfo; }
Membership& getMembership() { return membership; }
- void membershipUpdate(const types::Variant::List&, const IdSet&);
+ void membershipUpdate(const types::Variant::List&);
private:
void setClientUrl(const Url&, sys::Mutex::ScopedLock&);
diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp
index 6c6961f094..92436c9e56 100644
--- a/qpid/cpp/src/qpid/ha/Membership.cpp
+++ b/qpid/cpp/src/qpid/ha/Membership.cpp
@@ -19,6 +19,9 @@
*
*/
#include "Membership.h"
+#include <boost/bind.hpp>
+#include <iostream>
+#include <iterator>
namespace qpid {
namespace ha {
@@ -77,23 +80,26 @@ types::Variant::List Membership::asList(sys::Mutex::ScopedLock&) const {
void Membership::update(sys::Mutex::ScopedLock& l) {
if (updateCallback) {
types::Variant::List list = asList(l);
- IdSet ids = otherBackups(l);
sys::Mutex::ScopedUnlock u(lock);
- updateCallback(list, ids);
+ // FIXME aconway 2012-06-06: messy: Make this a data object,
+ // move locking into HaBroker?
+ updateCallback(list);
}
+ QPID_LOG(debug, " HA: Membership update: " << brokers);
}
-IdSet Membership::otherBackups() const {
+BrokerInfo::Set Membership::otherBackups() const {
sys::Mutex::ScopedLock l(lock);
return otherBackups(l);
}
-IdSet Membership::otherBackups(sys::Mutex::ScopedLock&) const {
- IdSet result;
+BrokerInfo::Set Membership::otherBackups(sys::Mutex::ScopedLock&) const {
+ BrokerInfo::Set result;
for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
if (isBackup(i->second.getStatus()) && i->second.getSystemId() != self)
- result.insert(i->second.getSystemId());
+ result.insert(i->second);
return result;
}
+
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Membership.h b/qpid/cpp/src/qpid/ha/Membership.h
index bdb55425dc..6b88b6e5d7 100644
--- a/qpid/cpp/src/qpid/ha/Membership.h
+++ b/qpid/cpp/src/qpid/ha/Membership.h
@@ -30,6 +30,7 @@
#include <boost/function.hpp>
#include <set>
#include <vector>
+#include <iosfwd>
namespace qpid {
namespace ha {
@@ -40,9 +41,7 @@ namespace ha {
class Membership
{
public:
- typedef boost::function<void (const types::Variant::List&,
- const IdSet&) > UpdateCallback;
-
+ typedef boost::function<void (const types::Variant::List&) > UpdateCallback;
Membership(const types::Uuid& self_, UpdateCallback updateFn)
: self(self_), updateCallback(updateFn) {}
@@ -51,22 +50,24 @@ class Membership
void remove(const types::Uuid& id);
bool contains(const types::Uuid& id);
/** Return IDs of all backups other than self */
- IdSet otherBackups() const;
+ BrokerInfo::Set otherBackups() const;
void assign(const types::Variant::List&);
types::Variant::List asList() const;
private:
typedef std::map<types::Uuid, BrokerInfo> BrokerMap;
- IdSet otherBackups(sys::Mutex::ScopedLock&) const;
+ BrokerInfo::Set otherBackups(sys::Mutex::ScopedLock&) const;
types::Variant::List asList(sys::Mutex::ScopedLock&) const;
void update(sys::Mutex::ScopedLock&);
+ std::ostream& print(std::ostream& o, sys::Mutex::ScopedLock&) const;
mutable sys::Mutex lock;
types::Uuid self;
BrokerMap brokers;
UpdateCallback updateCallback;
};
+
}} // namespace qpid::ha
#endif /*!QPID_HA_MEMBERSHIP_H*/
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 63cba14484..cd731fe732 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -19,11 +19,14 @@
*
*/
#include "Backup.h"
-#include "ConnectionExcluder.h"
#include "HaBroker.h"
#include "Primary.h"
#include "ReplicatingSubscription.h"
+#include "RemoteBackup.h"
+#include "ConnectionObserver.h"
+#include "qpid/assert.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/ConfigurationObserver.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
@@ -32,31 +35,140 @@
namespace qpid {
namespace ha {
+using sys::Mutex;
+
+namespace {
+// No-op connection observer, allows all connections.
+class PrimaryConnectionObserver : public broker::ConnectionObserver
+{
+ public:
+ PrimaryConnectionObserver(Primary& p) : primary(p) {}
+ void opened(broker::Connection& c) { primary.opened(c); }
+ void closed(broker::Connection& c) { primary.closed(c); }
+ private:
+ Primary& primary;
+};
+
+class PrimaryConfigurationObserver : public broker::ConfigurationObserver
+{
+ public:
+ PrimaryConfigurationObserver(Primary& p) : primary(p) {}
+ void queueCreate(const Primary::QueuePtr& q) { primary.queueCreate(q); }
+ void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); }
+ private:
+ Primary& primary;
+};
+
+} // namespace
+
Primary* Primary::instance = 0;
-Primary::Primary(HaBroker& hb, const IdSet& backups) :
- haBroker(hb), logPrefix("HA primary: "),
- unready(0), activated(false),
- queues(hb.getBroker(), hb.getReplicationTest(), backups)
+Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
+ haBroker(hb), logPrefix("HA primary: "), active(false)
{
assert(instance == 0);
instance = this; // Let queue replicators find us.
- if (backups.empty()) {
- QPID_LOG(debug, logPrefix << "Not waiting for backups");
- activated = true;
+ if (expect.empty()) {
+ QPID_LOG(debug, logPrefix << "No initial backups");
}
else {
- QPID_LOG(debug, logPrefix << "Waiting for backups: " << backups);
+ QPID_LOG(debug, logPrefix << "Waiting for initial backups: " << expect);
+ for (BrokerInfo::Set::iterator i = expect.begin(); i != expect.end(); ++i) {
+ boost::shared_ptr<RemoteBackup> backup(
+ new RemoteBackup(*i, haBroker.getBroker(), haBroker.getReplicationTest()));
+ backups[i->getSystemId()] = backup;
+ if (!backup->isReady()) initialBackups.insert(backup);
+ }
+ }
+
+ configurationObserver.reset(new PrimaryConfigurationObserver(*this));
+ haBroker.getBroker().getConfigurationObservers().add(configurationObserver);
+
+ Mutex::ScopedLock l(lock); // We are now active as a configurationObserver
+ checkReady(l);
+ // Allow client connections
+ connectionObserver.reset(new PrimaryConnectionObserver(*this));
+ haBroker.getObserver()->setObserver(connectionObserver);
+}
+
+Primary::~Primary() {
+ haBroker.getObserver()->setObserver(boost::shared_ptr<broker::ConnectionObserver>());
+ haBroker.getBroker().getConfigurationObservers().remove(configurationObserver);
+}
+
+void Primary::checkReady(Mutex::ScopedLock&) {
+ if (!active && initialBackups.empty()) {
+ active = true;
+ QPID_LOG(notice, logPrefix << "Active, all initial queues are safe.");
+ Mutex::ScopedUnlock u(lock); // Don't hold lock across callback
+ haBroker.activate();
+ }
+}
+
+void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) {
+ if (i != backups.end() && i->second->isReady()) {
+ initialBackups.erase(i->second);
+ checkReady(l);
}
}
void Primary::readyReplica(const ReplicatingSubscription& rs) {
sys::Mutex::ScopedLock l(lock);
- if (queues.ready(rs.getQueue(), rs.getBrokerInfo().getSystemId()) && !activated) {
- activated = true;
- haBroker.activate();
- QPID_LOG(notice, logPrefix << "Activated, all initial queues are safe.");
+ BackupMap::iterator i = backups.find(rs.getBrokerInfo().getSystemId());
+ if (i != backups.end()) {
+ i->second->ready(rs.getQueue());
+ checkReady(i, l);
+ }
+}
+
+void Primary::queueCreate(const QueuePtr& q) {
+ Mutex::ScopedLock l(lock);
+ for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
+ i->second->queueCreate(q);
+ checkReady(i, l);
+ }
+}
+
+void Primary::queueDestroy(const QueuePtr& q) {
+ Mutex::ScopedLock l(lock);
+ for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
+ i->second->queueDestroy(q);
+ checkReady(l);
+}
+
+void Primary::opened(broker::Connection& connection) {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo info;
+ if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
+ haBroker.getMembership().add(info);
+ BackupMap::iterator i = backups.find(info.getSystemId());
+ if (i == backups.end()) {
+ QPID_LOG(debug, logPrefix << "New backup connected: " << info);
+ backups[info.getSystemId()].reset(
+ new RemoteBackup(info, haBroker.getBroker(), haBroker.getReplicationTest()));
+ }
+ else {
+ QPID_LOG(debug, logPrefix << "Known backup connected: " << info);
+ }
+ }
+}
+
+void Primary::closed(broker::Connection& connection) {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo info;
+ if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
+ haBroker.getMembership().remove(info.getSystemId());
+ QPID_LOG(debug, "HA primary: Backup disconnected: " << info);
+ backups.erase(info.getSystemId());
+ // FIXME aconway 2012-06-01: changes to expected backup set for unready queues.
}
}
+
+boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerInfo& info)
+{
+ BackupMap::iterator i = backups.find(info.getSystemId());
+ return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q);
+}
+
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index 3a1a9be9e8..d9a4eb365c 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -22,8 +22,8 @@
*
*/
-#include "UnreadyQueueSet.h"
#include "types.h"
+#include "BrokerInfo.h"
#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
#include <map>
@@ -33,38 +33,63 @@ namespace qpid {
namespace broker {
class Queue;
+class Connection;
+class ConnectionObserver;
+class ConfigurationObserver;
}
namespace ha {
class HaBroker;
class ReplicatingSubscription;
+class RemoteBackup;
+class QueueGuard;
/**
- * State associated with a primary broker. Tracks replicating
- * subscriptions to determine when primary is active.
+ * State associated with a primary broker:
+ * - tracks readiness of initial backups to determine when primary is active.
+ * - sets updates queue guards on new queues with for each backup.
*
- * THREAD SAFE: readyReplica is called in arbitray threads.
+ * THREAD SAFE: readyReplica and ConfigurationObserver functions called concurrently.
*/
+
class Primary
{
public:
+ typedef boost::shared_ptr<broker::Queue> QueuePtr;
+
static Primary* get() { return instance; }
- Primary(HaBroker& hb, const IdSet& expectedBackups);
+ Primary(HaBroker& hb, const BrokerInfo::Set& expectedBackups);
+ ~Primary();
void readyReplica(const ReplicatingSubscription&);
void removeReplica(const std::string& q);
- UnreadyQueueSet& getUnreadyQueueSet() { return queues; }
- bool isActive() { return activated; }
+ // Called via ConfigurationObserver
+ void queueCreate(const QueuePtr&);
+ void queueDestroy(const QueuePtr&);
+
+ // Called via ConnectionObserver
+ void opened(broker::Connection& connection);
+ void closed(broker::Connection& connection);
+
+ boost::shared_ptr<QueueGuard> getGuard(const QueuePtr& q, const BrokerInfo&);
private:
+ typedef std::map<types::Uuid, boost::shared_ptr<RemoteBackup> > BackupMap;
+ typedef std::set<boost::shared_ptr<RemoteBackup> > BackupSet;
+
+ void checkReady(sys::Mutex::ScopedLock&);
+ void checkReady(BackupMap::iterator, sys::Mutex::ScopedLock&);
+
sys::Mutex lock;
HaBroker& haBroker;
std::string logPrefix;
- size_t expected, unready;
- bool activated;
- UnreadyQueueSet queues;
+ bool active;
+ BackupSet initialBackups;
+ BackupMap backups;
+ boost::shared_ptr<broker::ConnectionObserver> connectionObserver;
+ boost::shared_ptr<broker::ConfigurationObserver> configurationObserver;
static Primary* instance;
};
diff --git a/qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h b/qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h
index 1aa61b2dea..b453d64ae6 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h
+++ b/qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h
@@ -44,32 +44,17 @@ class HaBroker;
*
* THREAD SAFE: has no state, just mediates between other thread-safe objects.
*/
+// FIXME aconway 2012-06-06: rename observer
class PrimaryConnectionMonitor : public broker::ConnectionObserver
{
public:
- PrimaryConnectionMonitor(HaBroker& hb) : haBroker(hb) {}
+ PrimaryConnectionMonitor(Primary& p) : primary(p) {}
+ void opened(broker::Connection& connection) { primary.opened(connection); }
+ void closed(broker::Connection& connection) { primary.closed(connection); }
- void opened(broker::Connection& connection) {
- BrokerInfo info;
- if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
- QPID_LOG(debug, "HA primary: Backup connected: " << info);
- haBroker.getMembership().add(info);
- // FIXME aconway 2012-06-01: changes to expected backup set for unready queues.
- }
- }
-
- void closed(broker::Connection& connection) {
- BrokerInfo info;
- if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
- QPID_LOG(debug, "HA primary: Backup disconnected: " << info);
- haBroker.getMembership().remove(info.getSystemId());
- // FIXME aconway 2012-06-01: changes to expected backup set for unready queues.
- }
- }
- private:
- void reject(broker::Connection&);
- HaBroker& haBroker;
- };
+ private:
+ Primary& primary;
+};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index 55dc6b0d50..5ea4c8e6f8 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -22,6 +22,7 @@
#include "ReplicatingSubscription.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/log/Statement.h"
#include <sstream>
@@ -31,19 +32,33 @@ namespace ha {
using namespace broker;
using sys::Mutex;
+class QueueGuard::QueueObserver : public broker::QueueObserver
+{
+ public:
+ QueueObserver(QueueGuard& g) : guard(g) {}
+ void enqueued(const broker::QueuedMessage& qm) { guard.enqueued(qm); }
+ void dequeued(const broker::QueuedMessage& qm) { guard.dequeued(qm); }
+ void acquired(const broker::QueuedMessage&) {}
+ void requeued(const broker::QueuedMessage&) {}
+ private:
+ QueueGuard& guard;
+};
+
+
+
QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
: queue(q), subscription(0)
{
// Set a log prefix message that identifies the remote broker.
std::ostringstream os;
- os << "HA subscription " << queue.getName() << "@" << info.getLogId() << ": ";
+ os << "HA guard " << queue.getName() << "@" << info.getLogId() << ": ";
logPrefix = os.str();
+ observer.reset(new QueueObserver(*this));
+ queue.addObserver(observer);
+ readyPosition = queue.getPosition(); // Must set after addObserver()
}
-void QueueGuard::initialize() {
- Mutex::ScopedLock l(lock);
- queue.addObserver(shared_from_this());
-}
+QueueGuard::~QueueGuard() { cancel(); }
void QueueGuard::enqueued(const QueuedMessage& qm) {
// Delay completion
@@ -69,7 +84,11 @@ void QueueGuard::dequeued(const QueuedMessage& qm) {
}
void QueueGuard::cancel() {
- queue.removeObserver(shared_from_this());
+ queue.removeObserver(observer);
+ {
+ sys::Mutex::ScopedLock l(lock);
+ if (delayed.empty()) return; // No need if no delayed messages.
+ }
queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1));
}
@@ -94,6 +113,11 @@ void QueueGuard::complete(const QueuedMessage& qm) {
complete(qm, l);
}
+framing::SequenceNumber QueueGuard::getReadyPosition() {
+ // No lock, readyPosition is immutable.
+ return readyPosition;
+}
+
// FIXME aconway 2012-06-04: TODO support for timeout.
}} // namespaces qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h
index 739c1e0e13..bb98d2052d 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.h
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.h
@@ -23,13 +23,11 @@
*/
#include "types.h"
-#include "qpid/broker/QueueObserver.h"
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/types/Uuid.h"
#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
-#include <boost/enable_shared_from_this.hpp>
#include <deque>
#include <set>
@@ -58,16 +56,10 @@ class ReplicatingSubscription;
* arbitrary connection threads, and from ReplicatingSubscription
* in the subscriptions thread.
*/
-class QueueGuard : public broker::QueueObserver,
- public boost::enable_shared_from_this<QueueGuard>
-{
+class QueueGuard {
public:
QueueGuard(broker::Queue& q, const BrokerInfo&);
-
- /** Must be called after ctor, requires a shared_ptr to this to exist.
- * Must be called before ReplicatingSubscription::initialize(this)
- */
- void initialize();
+ ~QueueGuard();
/** QueueObserver override. Delay completion of the message. */
void enqueued(const broker::QueuedMessage&);
@@ -83,16 +75,19 @@ class QueueGuard : public broker::QueueObserver,
void attach(ReplicatingSubscription&);
- // Unused QueueObserver functions.
- void acquired(const broker::QueuedMessage&) {}
- void requeued(const broker::QueuedMessage&) {}
+ /** The first sequence number that has been processed */
+ framing::SequenceNumber getReadyPosition();
private:
+ class QueueObserver;
+
sys::Mutex lock;
std::string logPrefix;
broker::Queue& queue;
framing::SequenceSet delayed;
ReplicatingSubscription* subscription;
+ boost::shared_ptr<QueueObserver> observer;
+ framing::SequenceNumber readyPosition;
void complete(const broker::QueuedMessage&, sys::Mutex::ScopedLock&);
};
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 4d12015008..8b12231453 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -115,13 +115,13 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
FieldTable settings;
settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
settings.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize?
- settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER,
+ settings.setInt(ReplicatingSubscription::QPID_BACK,
queue->getPosition());
settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO,
brokerInfo.asFieldTable());
SequenceNumber front;
if (ReplicatingSubscription::getFront(*queue, front))
- settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, front);
+ settings.setInt(ReplicatingSubscription::QPID_FRONT, front);
peer.getMessage().subscribe(
args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
false/*exclusive*/, "", 0, settings);
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
new file mode 100644
index 0000000000..94e60d7ed8
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "RemoteBackup.h"
+#include "QueueGuard.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueRegistry.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace ha {
+
+using sys::Mutex;
+
+RemoteBackup::RemoteBackup(
+ const BrokerInfo& info, broker::Broker& broker, ReplicationTest rt) :
+ logPrefix("HA backup "+info.getLogId()+": "), brokerInfo(info), replicationTest(rt)
+{
+ QPID_LOG(debug, logPrefix << "Guarding queues for backup broker. ");
+ broker.getQueues().eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1));
+}
+
+bool RemoteBackup::isReady() {
+ return initialQueues.empty();
+}
+
+void RemoteBackup::initialQueue(const QueuePtr& q) {
+ initialQueues.insert(q);
+ queueCreate(q);
+}
+
+RemoteBackup::GuardPtr RemoteBackup::guard(const QueuePtr& q) {
+ GuardMap::iterator i = guards.find(q);
+ if (i == guards.end()) {
+ assert(0);
+ throw Exception(logPrefix+": Guard cannot find queue guard: "+q->getName());
+ }
+ GuardPtr guard = i->second;
+ guards.erase(i);
+ return guard;
+}
+
+void RemoteBackup::ready(const QueuePtr& q) {
+ initialQueues.erase(q);
+}
+
+void RemoteBackup::queueCreate(const QueuePtr& q) {
+ if (replicationTest.isReplicated(ALL, *q)) {
+ QPID_LOG(debug, logPrefix << "Setting guard on " << q->getName());
+ guards[q].reset(new QueueGuard(*q, brokerInfo));
+ }
+}
+
+void RemoteBackup::queueDestroy(const QueuePtr& q) {
+ initialQueues.erase(q);
+ GuardMap::iterator i = guards.find(q);
+ if (i != guards.end()) {
+ i->second->cancel();
+ guards.erase(i);
+ }
+}
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h
new file mode 100644
index 0000000000..39020c9b7d
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h
@@ -0,0 +1,84 @@
+#ifndef QPID_HA_REMOTEBACKUP_H
+#define QPID_HA_REMOTEBACKUP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ReplicationTest.h"
+#include "BrokerInfo.h"
+#include "types.h"
+#include <set>
+#include <map>
+
+namespace qpid {
+
+namespace broker {
+class Broker;
+class Queue;
+}
+
+namespace ha {
+class QueueGuard;
+
+/**
+ * Track readiness for a remote broker.
+ * Creates queue guards on behalf of the remote broker to keep
+ * queues safe till the ReplicatingSubscription is ready.
+ *
+ * THREAD UNSAFE: Caller must serialize.
+ */
+class RemoteBackup
+{
+ public:
+ typedef boost::shared_ptr<QueueGuard> GuardPtr;
+ typedef boost::shared_ptr<broker::Queue> QueuePtr;
+
+ RemoteBackup(const BrokerInfo& info, broker::Broker&, ReplicationTest rt);
+
+ /** Return guard associated with a queue. Used to create ReplicatingSubscription. */
+ GuardPtr guard(const QueuePtr&);
+
+ /** ReplicatingSubscription associated with queue is ready. */
+ void ready(const QueuePtr& queue);
+
+ // Called ConfigurationObserver
+ void queueCreate(const QueuePtr&);
+ void queueDestroy(const QueuePtr&);
+
+ /**@return true when all initial queues for this backup are ready */
+ bool isReady();
+
+ private:
+ typedef std::map<QueuePtr, GuardPtr> GuardMap;
+ typedef std::set<QueuePtr> QueueSet;
+
+ std::string logPrefix;
+ BrokerInfo brokerInfo;
+ ReplicationTest replicationTest;
+ GuardMap guards;
+ QueueSet initialQueues;
+
+ void initialQueue(const QueuePtr&);
+};
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_REMOTEBACKUP_H*/
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index f7bfe6fda0..7438c95bc2 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -37,11 +37,12 @@ namespace ha {
using namespace framing;
using namespace broker;
using namespace std;
+using sys::Mutex;
-const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
-const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number");
-const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number");
-const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.broker-info");
+const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription");
+const string ReplicatingSubscription::QPID_BACK("qpid.ha-back");
+const string ReplicatingSubscription::QPID_FRONT("qpid.ha-front");
+const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info");
namespace {
const string DOLLAR("$");
@@ -61,7 +62,7 @@ class DequeueRemover
}
void operator()(const QueuedMessage& message) {
- if (message.position >= start && message.position <= end) {
+ if (message.position >= start && message.position <= end) {
//i.e. message is within the intial range and has not been dequeued,
//so remove it from the dequeues
dequeues.remove(message.position);
@@ -112,6 +113,11 @@ bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front)
return getNext(q, 0, front);
}
+bool ReplicatingSubscription::isEmpty(broker::Queue& q) {
+ SequenceNumber front;
+ return getFront(q, front);
+}
+
/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
ReplicatingSubscription::Factory::create(
@@ -131,9 +137,7 @@ ReplicatingSubscription::Factory::create(
rs.reset(new ReplicatingSubscription(
parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
- boost::shared_ptr<QueueGuard> guard(new QueueGuard(*queue, rs->getBrokerInfo()));
- guard->initialize(); // Must call before ReplicatingSubscription::initialize
- rs->initialize(guard);
+ rs->initialize();
}
return rs;
}
@@ -153,23 +157,19 @@ struct QueueRange {
}
QueueRange(const framing::FieldTable args) {
- back = args.getAsInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER);
+ back = args.getAsInt(ReplicatingSubscription::QPID_BACK);
front = back+1;
- empty = !args.isSet(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER);
+ empty = !args.isSet(ReplicatingSubscription::QPID_FRONT);
if (!empty) {
- front = args.getAsInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER);
+ front = args.getAsInt(ReplicatingSubscription::QPID_FRONT);
if (back < front)
throw InvalidArgumentException("Invalid bounds for backup queue");
}
}
-
- /** Consumer position to start consuming from the front */
- SequenceNumber browserStart() { return front-1; }
};
ostream& operator<<(ostream& o, const QueueRange& qr) {
-
- if (qr.front > qr.back) return o << "empty(" << qr.back << ")";
+ if (qr.front > qr.back) return o << "[-" << qr.back << "]";
else return o << "[" << qr.front << "," << qr.back << "]";
}
@@ -222,14 +222,31 @@ ReplicatingSubscription::ReplicatingSubscription(
// Clear the backup queue and reset to start browsing at the
// front of the primary queue.
if (!backup.empty) dequeues.add(backup.front, backup.back);
- position = primary.browserStart();
+ position = primary.front - 1; // Start consuming from front.
}
- QPID_LOG(debug, logPrefix << "New backup subscription " << getName()
- << " backup range " << backup
- << " primary range " << primary
- << " position " << position
- << " dequeues " << dequeues);
+ QPID_LOG(debug, logPrefix << "Subscribed: "
+ << " backup" << backup
+ << " primary" << primary
+ << " position=" << position
+ << " dequeues=" << dequeues);
+
+ // Set the guard
+ if (Primary::get()) guard = Primary::get()->getGuard(queue, getBrokerInfo());
+ if (!guard) guard.reset(new QueueGuard(*queue, getBrokerInfo()));
+ guard->attach(*this);
+
+ // Guard is active, dequeued can be called concurrently.
+ Mutex::ScopedLock l(lock);
+
+ // Set the ready position. All messages after this position have
+ // been seen by the guard.
+ readyPosition = guard->getReadyPosition();
+ if (position >= readyPosition || isEmpty(*getQueue()))
+ setReady(l);
+ else
+ QPID_LOG(debug, logPrefix << "Catching up from "
+ << position << " to " << readyPosition);
}
catch (const std::exception& e) {
throw Exception(QPID_MSG(logPrefix << "Error setting up replication: "
@@ -242,35 +259,17 @@ ReplicatingSubscription::~ReplicatingSubscription() {
}
// Called in subscription's connection thread when the subscription is created.
-void ReplicatingSubscription::initialize(const boost::shared_ptr<QueueGuard>& g) {
- sys::Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
- // Attach to the guard.
- guard = g;
- guard->attach(*this);
+// Called separate from ctor because sending events requires
+// shared_from_this
+//
+void ReplicatingSubscription::initialize() {
+ Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
// Send initial dequeues and position to the backup.
// There must be a shared_ptr(this) when sending.
sendDequeueEvent(l);
sendPositionEvent(position, l);
backupPosition = position;
-
- // Set the ready position. All messages after this position have
- // been seen by the guard.
- QueueRange range;
- {
- // Drop the lock, QueueRange will lock the queues message lock
- // which is also locked around calls to enqueued() and dequeued()
- sys::Mutex::ScopedUnlock u(lock);
- range = QueueRange(*getQueue());
- }
- readyPosition = range.back;
- if (range.empty || position >= readyPosition) {
- setReady(l);
- }
- else {
- QPID_LOG(debug, logPrefix << "Backup subscription catching up from "
- << position << " to " << readyPosition);
- }
}
// Message is delivered in the subscription's connection thread.
@@ -280,7 +279,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
if (qm.queue == getQueue().get()) {
QPID_LOG(trace, logPrefix << "Replicating " << qm);
{
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
assert(position == qm.position);
// qm.position is the position of the newly enqueued qm on local queue.
// backupPosition is latest position on backup queue before enqueueing
@@ -299,7 +298,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
// Deliver the message
bool delivered = ConsumerImpl::deliver(qm);
{
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
// If we have advanced to the initial position, the backup is ready.
if (qm.position >= readyPosition) setReady(l);
}
@@ -314,12 +313,12 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
}
}
-void ReplicatingSubscription::setReady(sys::Mutex::ScopedLock&) {
+void ReplicatingSubscription::setReady(Mutex::ScopedLock&) {
if (ready) return;
ready = true;
// Notify Primary that a subscription is ready.
{
- sys::Mutex::ScopedUnlock u(lock);
+ Mutex::ScopedUnlock u(lock);
QPID_LOG(info, logPrefix << "Caught up at " << getPosition());
if (Primary::get()) Primary::get()->readyReplica(*this);
}
@@ -343,7 +342,7 @@ void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) {
bool ReplicatingSubscription::hideDeletedError() { return true; }
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendDequeueEvent(sys::Mutex::ScopedLock&)
+void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
{
if (dequeues.empty()) return;
QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
@@ -353,7 +352,7 @@ void ReplicatingSubscription::sendDequeueEvent(sys::Mutex::ScopedLock&)
dequeues.clear();
buffer.reset();
{
- sys::Mutex::ScopedUnlock u(lock);
+ Mutex::ScopedUnlock u(lock);
sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer);
}
}
@@ -367,7 +366,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
bool doComplete = false;
QPID_LOG(trace, logPrefix << "Dequeued " << qm);
{
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
dequeues.add(qm.position);
if (qm.position > position) doComplete = true;
}
@@ -378,7 +377,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
}
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, sys::Mutex::ScopedLock&)
+void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock&)
{
if (pos == backupPosition) return; // No need to send.
QPID_LOG(trace, logPrefix << "Sending position " << pos
@@ -388,7 +387,7 @@ void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, sys::Mutex::
pos.encode(buffer);
buffer.reset();
{
- sys::Mutex::ScopedUnlock u(lock);
+ Mutex::ScopedUnlock u(lock);
sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer);
}
}
@@ -428,7 +427,7 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer&
bool ReplicatingSubscription::doDispatch()
{
{
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
if (!dequeues.empty()) sendDequeueEvent(l);
}
return ConsumerImpl::doDispatch();
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index e69a2159e6..9be8364117 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -73,8 +73,8 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
// Argument names for consume command.
static const std::string QPID_REPLICATING_SUBSCRIPTION;
- static const std::string QPID_HIGH_SEQUENCE_NUMBER;
- static const std::string QPID_LOW_SEQUENCE_NUMBER;
+ static const std::string QPID_BACK;
+ static const std::string QPID_FRONT;
static const std::string QPID_BROKER_INFO;
// TODO aconway 2012-05-23: these don't belong on ReplicatingSubscription
@@ -111,7 +111,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
/** Initialization that must be done after construction because it
* requires a shared_ptr to this to exist. Will attach to guard
*/
- void initialize(const boost::shared_ptr<QueueGuard>& guard);
+ void initialize();
BrokerInfo getBrokerInfo() const { return info; }
diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
index 1db101dc94..613ac0a8c6 100644
--- a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
@@ -52,18 +52,23 @@ namespace {
const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
}
-bool ReplicationTest::isReplicated(const Variant::Map& args, bool autodelete, bool exclusive) {
+bool ReplicationTest::isReplicated(
+ ReplicateLevel level, const Variant::Map& args, bool autodelete, bool exclusive)
+{
bool ignore = autodelete && exclusive && args.find(AUTO_DELETE_TIMEOUT) == args.end();
- return replicateLevel(args) && !ignore;
+ return !ignore && replicateLevel(args) >= level;
}
-bool ReplicationTest::isReplicated(const framing::FieldTable& args, bool autodelete, bool exclusive) {
+bool ReplicationTest::isReplicated(
+ ReplicateLevel level, const framing::FieldTable& args, bool autodelete, bool exclusive)
+{
bool ignore = autodelete && exclusive && !args.isSet(AUTO_DELETE_TIMEOUT);
- return replicateLevel(args) && !ignore;
+ return !ignore && replicateLevel(args) >= level;
}
-bool ReplicationTest::isReplicated(const broker::Queue& q) {
- return isReplicated(q.getSettings(), q.isAutoDelete(), q.hasExclusiveOwner());
+bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Queue& q)
+{
+ return isReplicated(level, q.getSettings(), q.isAutoDelete(), q.hasExclusiveOwner());
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.h b/qpid/cpp/src/qpid/ha/ReplicationTest.h
index 4851f34e84..50a41ccbc3 100644
--- a/qpid/cpp/src/qpid/ha/ReplicationTest.h
+++ b/qpid/cpp/src/qpid/ha/ReplicationTest.h
@@ -47,13 +47,18 @@ class ReplicationTest
ReplicationTest(ReplicateLevel replicateDefault_) :
replicateDefault(replicateDefault_) {}
+ // Return the simple replication level, accounting for defaults.
ReplicateLevel replicateLevel(const std::string& str);
ReplicateLevel replicateLevel(const framing::FieldTable& f);
ReplicateLevel replicateLevel(const types::Variant::Map& m);
- bool isReplicated(const types::Variant::Map& args, bool autodelete, bool exclusive);
- bool isReplicated(const framing::FieldTable& args, bool autodelete, bool exclusive);
- bool isReplicated(const broker::Queue&);
+ // Return true if replication for a queue is enabled at level or
+ // higher, taking account of all settings.
+ bool isReplicated(ReplicateLevel level,
+ const types::Variant::Map& args, bool autodelete, bool exclusive);
+ bool isReplicated(ReplicateLevel level,
+ const framing::FieldTable& args, bool autodelete, bool exclusive);
+ bool isReplicated(ReplicateLevel level, const broker::Queue&);
private:
ReplicateLevel replicateDefault;
};
diff --git a/qpid/cpp/src/qpid/ha/UnreadyQueueSet.cpp b/qpid/cpp/src/qpid/ha/UnreadyQueueSet.cpp
deleted file mode 100644
index 279eb2c0e1..0000000000
--- a/qpid/cpp/src/qpid/ha/UnreadyQueueSet.cpp
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "UnreadyQueueSet.h"
-#include "qpid/broker/Broker.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/QueueRegistry.h"
-#include <boost/bind.hpp>
-#include <iostream>
-#include <algorithm>
-
-namespace qpid {
-namespace ha {
-
-using sys::Mutex;
-
-UnreadyQueueSet::UnreadyQueueSet(broker::Broker& broker, ReplicationTest rt, const IdSet& ids) :
- logPrefix("HA unsafe queues: "), replicationTest(rt), expected(ids),
- initializing(true), initialQueues(0)
-{
- if (!expected.empty()) {
- QPID_LOG(debug, logPrefix << "Recovering, waiting for backups: " << expected);
- broker.getQueues().eachQueue(boost::bind(&UnreadyQueueSet::queueCreate, this, _1));
- initialQueues = queues.size();
- }
- initializing = false;
-}
-
-void UnreadyQueueSet::setExpectedBackups(const IdSet& ids) {
- Mutex::ScopedLock l(lock);
- expected = ids;
-}
-
-bool UnreadyQueueSet::ready(const boost::shared_ptr<broker::Queue>& q, const types::Uuid& id) {
- Mutex::ScopedLock l(lock);
- QPID_LOG(debug, logPrefix << "Replicating subscription ready: " << id << " on "
- << q->getName());
- QueueMap::iterator i = queues.find(q);
- if (i != queues.end()) {
- // if (i->second.guard->ready(id)) {
- // QPID_LOG(debug, logPrefix << "Releasing guard on " << q->getName());
- // remove(i, l);
- if (i->second.initial) --initialQueues;
- // }
- }
- return initialQueues == 0;
-}
-
-void UnreadyQueueSet::queueCreate(const boost::shared_ptr<broker::Queue>& q) {
- Mutex::ScopedLock l(lock);
- if (replicationTest.isReplicated(*q) && !expected.empty()) {
- QPID_LOG(debug, logPrefix << "Guarding " << q->getName() << " for " << expected);
- // GuardPtr guard(new QueueGuard(*q, expected));
- // FIXME aconway 2012-06-05: q->addObserver(guard);
- queues[q] = Entry(initializing);//, guard);
- }
-}
-
-void UnreadyQueueSet::queueDestroy(const boost::shared_ptr<broker::Queue>& q) {
- Mutex::ScopedLock l(lock);
- remove(queues.find(q), l);
-}
-
-void UnreadyQueueSet::remove(QueueMap::iterator i, sys::Mutex::ScopedLock&) {
- if (i != queues.end()) {
- QPID_LOG(debug, logPrefix << "Queue is safe: " << i->first->getName());
- // FIXME aconway 2012-06-05: i->first->removeObserver(i->second.guard);
- //i->second.guard->release();
- queues.erase(i);
- }
-}
-
-}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/UnreadyQueueSet.h b/qpid/cpp/src/qpid/ha/UnreadyQueueSet.h
deleted file mode 100644
index 0731282c2b..0000000000
--- a/qpid/cpp/src/qpid/ha/UnreadyQueueSet.h
+++ /dev/null
@@ -1,88 +0,0 @@
-#ifndef QPID_HA_BROKERGUARD_H
-#define QPID_HA_BROKERGUARD_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ReplicationTest.h"
-#include "types.h"
-#include "qpid/broker/ConfigurationObserver.h"
-#include "qpid/sys/Mutex.h"
-#include <set>
-#include <map>
-
-namespace qpid {
-
-namespace broker {
-class Broker;
-}
-
-namespace ha {
-class QueueGuard;
-
-/**
- * ConfigurationObserver that sets a QueueGuard on all existing/new queues
- * so they are safe until their ReplicatingSubscriptions catch up.
- *
- * THREAD SAFE: Called concurrently as a ConfigurationObserver and via ready()
- */
-class UnreadyQueueSet : public qpid::broker::ConfigurationObserver
-{
- public:
- /** Caller should call broker.getConfigurationObservers().add(shared_ptr(this)) */
- UnreadyQueueSet(broker::Broker&, ReplicationTest rt, const IdSet& expected);
-
- void setExpectedBackups(const IdSet&);
-
- /** Backup id is ready on queue.
- *@return true if all initial queuse are now ready.
- */
- bool ready(const boost::shared_ptr<broker::Queue>&, const types::Uuid& id);
-
- // ConfigurationObserver overrides
- void queueCreate(const boost::shared_ptr<broker::Queue>&);
- void queueDestroy(const boost::shared_ptr<broker::Queue>&);
-
- // FIXME aconway 2012-05-31: handle IdSet changes.
- private:
- typedef boost::shared_ptr<QueueGuard> GuardPtr;
- struct Entry {
- bool initial;
- // FIXME aconway 2012-06-05: GuardPtr guard;
- // Entry(bool i=false, GuardPtr g=GuardPtr()) : initial(i), guard(g) {}
- Entry(bool i=false) : initial(i) {}
- };
- typedef std::map<boost::shared_ptr<broker::Queue>, Entry> QueueMap;
-
- void remove(QueueMap::iterator i, sys::Mutex::ScopedLock&);
-
- sys::Mutex lock;
- std::string logPrefix;
- ReplicationTest replicationTest;
- IdSet expected;
- QueueMap queues;
- bool initializing;
- size_t initialQueues; // Count of initial queues still unready.
-};
-
-}} // namespace qpid::ha
-
-#endif /*!QPID_HA_BROKERGUARD_H*/
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index c32b7f2c96..6855ed03bb 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -241,15 +241,13 @@ def find_in_file(str, filename):
class Broker(Popen):
"A broker process. Takes care of start, stop and logging."
_broker_count = 0
+ _log_count = 0
def __str__(self): return "Broker<%s %s>"%(self.name, self.pname)
def find_log(self):
- self.log = "%s.log" % self.name
- i = 1
- while (os.path.exists(self.log)):
- self.log = "%s.%d.log" % (self.name, i)
- i += 1
+ self.log = "%03d:%s.log" % (Broker._log_count, self.name)
+ Broker._log_count += 1
def get_log(self):
return os.path.abspath(self.log)
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 86679611c4..e43d8bcb91 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -20,7 +20,7 @@
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math
import traceback
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection
+from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout
from qpid.datatypes import uuid4
from brokertest import *
from threading import Thread, Lock, Condition
@@ -493,6 +493,8 @@ class ReplicationTests(BrokerTest):
for i in range(10): s.send(Message(str(i)), sync=False)
except qpid.messaging.exceptions.TargetCapacityExceeded: pass
backup.assert_browse_backup("q", [str(i) for i in range(0,5)])
+ # Detach, don't close as there is a broken session
+ s.session.connection.detach()
def test_priority(self):
"""Verify priority queues replicate correctly"""
@@ -716,21 +718,24 @@ class LongTests(BrokerTest):
brokers = HaCluster(self, 3)
# Start sender and receiver threads
- sender = NumberedSender(brokers[0], max_depth=1024, failover_updates=False)
- receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False)
- receiver.start()
- sender.start()
+ senders = [NumberedSender(brokers[0], max_depth=1024, failover_updates=False,
+ queue="test%s"%(i)) for i in xrange(10)]
+ receivers = [NumberedReceiver(brokers[0], sender=senders[i],
+ failover_updates=False,
+ queue="test%s"%(i)) for i in xrange(10)]
+ for r in receivers: r.start()
+ for s in senders: s.start()
# Wait for sender & receiver to get up and running
- assert retry(lambda: receiver.received > 100)
+ assert retry(lambda: receivers[0].received > 100)
# Kill and restart brokers in a cycle:
endtime = time.time() + self.duration()
i = 0
try:
while time.time() < endtime or i < 3: # At least 3 iterations
- sender.sender.assert_running()
- receiver.receiver.assert_running()
- n = receiver.received
+ for s in senders: s.sender.assert_running()
+ for r in receivers: r.receiver.assert_running()
+ n = receivers[0].received
# FIXME aconway 2012-05-01: don't kill primary till it's active
# otherwise we can lose messages. When we implement non-promotion
# of catchup brokers we can make this stronger: wait only for
@@ -739,23 +744,22 @@ class LongTests(BrokerTest):
brokers.bounce(i%3)
i += 1
def enough(): # Verify we're still running
- receiver.check() # Verify no exceptions
- return receiver.received > n + 100
+ receivers[0].check() # Verify no exceptions
+ return receivers[0].received > n + 100
# FIXME aconway 2012-05-17: client reconnect sometimes takes > 1 sec.
- assert retry(enough, 3), "Stalled: %s < %s+100"%(receiver.received, n)
+ assert retry(enough, 10), "Stalled: %s < %s+100"%(receivers[0].received, n)
except:
traceback.print_exc()
raise
finally:
- sender.stop()
- receiver.stop()
+ for s in senders: s.stop()
+ for r in receivers: r.stop()
dead = []
for i in xrange(3):
if not brokers[i].is_running(): dead.append(i)
brokers.kill(i, False)
if dead: raise Exception("Brokers not running: %s"%dead)
-
class RecoveryTests(BrokerTest):
"""Tests for recovery after a failure."""
@@ -766,31 +770,37 @@ class RecoveryTests(BrokerTest):
cluster = HaCluster(self, 4);
# Wait for the primary to be ready
cluster[0].wait_status("active")
-
# Create a queue before the failure.
s1 = cluster.connect(0).session().sender("q1;{create:always}")
for b in cluster: b.wait_backup("q1")
for i in xrange(100): s1.send(str(i))
-
# Kill primary and 2 backups
for i in [0,1,2]: cluster.kill(i, False)
cluster[3].promote() # New primary, backups will be 1 and 2
cluster[3].wait_status("recovering")
+ def trySync(s):
+ try:
+ s.sync(timeout=.1)
+ self.fail("Expected Timeout exception")
+ except Timeout: pass
+
# Create a queue after the failure
s2 = cluster.connect(3).session().sender("q2;{create:always}")
-
# Verify that messages sent are not completed
for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False)
+ trySync(s1)
self.assertEqual(s1.unsettled(), 100)
+ trySync(s2)
self.assertEqual(s2.unsettled(), 100)
# Verify we can receive even if sending is on hold:
cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)])
-
# Restart backups, verify queues are released only when both backups are up
cluster.restart(1)
+ trySync(s1)
self.assertEqual(s1.unsettled(), 100)
+ trySync(s2)
self.assertEqual(s2.unsettled(), 100)
self.assertEqual(cluster[3].ha_status(), "recovering")
cluster.restart(2)
@@ -801,6 +811,8 @@ class RecoveryTests(BrokerTest):
cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])
cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)])
cluster[3].wait_status("active"),
+ s1.session.connection.close()
+ s2.session.connection.close()
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)