summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-14 00:47:39 +0000
committerAlan Conway <aconway@apache.org>2012-06-14 00:47:39 +0000
commit5fbda29971ec529d2fd147ffabdccb95eb4a6040 (patch)
tree574c5d46385a19a790f34374b878cd9e5a6d2354
parent7a83a79c9367107cf9b8e550e00846c312e162ae (diff)
downloadqpid-python-5fbda29971ec529d2fd147ffabdccb95eb4a6040.tar.gz
QPID-3603: Bug fixes to HA code, passing test_failover_send_receive
- Updated HA logging messages to conform to new [Category] log format. - QueueGuard fencepost error, set firstSafe correctly - ReplicatingSubscription passing correct broker info to getGuard() - Don't remove RemoteBackups on disconnect: fixes race where backup rejects connection but primary sees closed. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1350069 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/ha/Backup.cpp2
-rw-r--r--cpp/src/qpid/ha/BackupConnectionExcluder.h5
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp37
-rw-r--r--cpp/src/qpid/ha/ConnectionObserver.cpp10
-rw-r--r--cpp/src/qpid/ha/HaBroker.cpp2
-rw-r--r--cpp/src/qpid/ha/Primary.cpp12
-rw-r--r--cpp/src/qpid/ha/Primary.h5
-rw-r--r--cpp/src/qpid/ha/QueueGuard.cpp4
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp9
-rw-r--r--cpp/src/qpid/ha/RemoteBackup.cpp4
-rw-r--r--cpp/src/qpid/ha/RemoteBackup.h11
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp4
12 files changed, 66 insertions, 39 deletions
diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp
index 1c48d1a4f1..8ed64c6767 100644
--- a/cpp/src/qpid/ha/Backup.cpp
+++ b/cpp/src/qpid/ha/Backup.cpp
@@ -45,7 +45,7 @@ using types::Variant;
using std::string;
Backup::Backup(HaBroker& hb, const Settings& s) :
- logPrefix("HA backup: "), haBroker(hb), broker(hb.getBroker()), settings(s)
+ logPrefix("Backup: "), haBroker(hb), broker(hb.getBroker()), settings(s)
{
// Empty brokerUrl means delay initialization until seBrokertUrl() is called.
if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
diff --git a/cpp/src/qpid/ha/BackupConnectionExcluder.h b/cpp/src/qpid/ha/BackupConnectionExcluder.h
index 6fd3a3ae09..0c66597961 100644
--- a/cpp/src/qpid/ha/BackupConnectionExcluder.h
+++ b/cpp/src/qpid/ha/BackupConnectionExcluder.h
@@ -24,6 +24,7 @@
#include "qpid/broker/ConnectionObserver.h"
#include "qpid/broker/Connection.h"
+#include "qpid/log/Statement.h"
namespace qpid {
namespace ha {
@@ -36,7 +37,9 @@ class BackupConnectionExcluder : public broker::ConnectionObserver
{
public:
void opened(broker::Connection& connection) {
- throw Exception("HA backup rejected connection "+connection.getMgmtId());
+ // FIXME aconway 2012-06-13: suppress caught error message, make this an info message.
+ QPID_LOG(error, "Backup broker rejected connection "+connection.getMgmtId());
+ throw Exception("Backup broker rejected connection "+connection.getMgmtId());
}
void closed(broker::Connection&) {}
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index a99a8a3b91..d510d13b31 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -170,7 +170,7 @@ Variant::Map asMapVoid(const Variant& value) {
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
- logPrefix("HA backup: "), replicationTest(hb.getReplicationTest()),
+ logPrefix("Backup configuration: "), replicationTest(hb.getReplicationTest()),
haBroker(hb), broker(hb.getBroker()), link(l)
{}
@@ -243,7 +243,9 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler);
sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler);
sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
- QPID_LOG(debug, logPrefix << "Opened configuration bridge: " << queueName);
+ qpid::Address primary;
+ link->getRemoteAddress(primary);
+ QPID_LOG(info, logPrefix << "Connected to " << primary << "(" << queueName << ")");
}
void BrokerReplicator::route(Deliverable& msg) {
@@ -320,7 +322,7 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
values[USER].asString(),
values[RHOST].asString());
assert(result.second); // Should be true since we destroyed existing queue above
- QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
+ QPID_LOG(debug, logPrefix << "Queue declare event, starting replication: " << name);
startQueueReplicator(result.first);
}
}
@@ -466,8 +468,10 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/);
// It is normal for the queue to already exist if we are failing over.
+ QPID_LOG(debug, logPrefix << "Queue response, "
+ << (result.second ? "starting replication: " : "already replicated: ")
+ << name);
if (result.second) startQueueReplicator(result.first);
- QPID_LOG(debug, logPrefix << "Queue response: " << name);
}
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
@@ -475,20 +479,17 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
if (!replicationTest.replicateLevel(argsMap)) return;
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
- if (broker.createExchange(
- values[NAME].asString(),
- values[TYPE].asString(),
- values[DURABLE].asBool(),
- ""/*TODO: need to include alternate-exchange*/,
- args,
- ""/*TODO: who is the user?*/,
- ""/*TODO: what should we use as connection id?*/).second)
- {
- QPID_LOG(debug, logPrefix << "Exchange response: " << values[NAME].asString());
- } else {
- QPID_LOG(warning, logPrefix << "Exchange response, already exists: " <<
- values[NAME].asString());
- }
+ bool created = broker.createExchange(
+ values[NAME].asString(),
+ values[TYPE].asString(),
+ values[DURABLE].asBool(),
+ ""/*TODO: need to include alternate-exchange*/,
+ args,
+ ""/*TODO: who is the user?*/,
+ ""/*TODO: what should we use as connection id?*/).second;
+ QPID_LOG(debug, logPrefix << "Exchange response, "
+ << (created ? "created replica: " : "already exists: ")
+ << values[NAME].asString());
}
namespace {
diff --git a/cpp/src/qpid/ha/ConnectionObserver.cpp b/cpp/src/qpid/ha/ConnectionObserver.cpp
index 234a55ee2c..f666aa85a6 100644
--- a/cpp/src/qpid/ha/ConnectionObserver.cpp
+++ b/cpp/src/qpid/ha/ConnectionObserver.cpp
@@ -30,7 +30,7 @@ namespace qpid {
namespace ha {
ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid)
- : haBroker(hb), logPrefix("HA connections: "), self(uuid) {}
+ : haBroker(hb), logPrefix("Connections: "), self(uuid) {}
// FIXME aconway 2012-06-06: move to BrokerInfo
bool ConnectionObserver::getBrokerInfo(broker::Connection& connection, BrokerInfo& info) {
@@ -61,8 +61,12 @@ void ConnectionObserver::opened(broker::Connection& connection) {
}
BrokerInfo info; // Avoid self connections.
if (getBrokerInfo(connection, info)) {
- if (info.getSystemId() == self)
- throw Exception(QPID_MSG(logPrefix << "Rejected connection from self"));
+ if (info.getSystemId() == self) {
+ // FIXME aconway 2012-06-13: suppress caught error message, make this an info message.
+ QPID_LOG(error, "HA broker rejected self connection "+connection.getMgmtId());
+ throw Exception("HA broker rejected self connection "+connection.getMgmtId());
+ }
+
}
ObserverPtr o(getObserver());
if (o) o->opened(connection);
diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp
index 9a359b75e7..dd30dd08d4 100644
--- a/cpp/src/qpid/ha/HaBroker.cpp
+++ b/cpp/src/qpid/ha/HaBroker.cpp
@@ -55,7 +55,7 @@ using types::Uuid;
using sys::Mutex;
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
- : logPrefix("HA: "),
+ : logPrefix("Broker: "),
broker(b),
systemId(broker.getSystem()->getSystemId().data()),
settings(s),
diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp
index f4709511c8..57231032ff 100644
--- a/cpp/src/qpid/ha/Primary.cpp
+++ b/cpp/src/qpid/ha/Primary.cpp
@@ -64,7 +64,7 @@ class PrimaryConfigurationObserver : public broker::ConfigurationObserver
Primary* Primary::instance = 0;
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
- haBroker(hb), logPrefix("HA primary: "), active(false)
+ haBroker(hb), logPrefix("Primary: "), active(false)
{
assert(instance == 0);
instance = this; // Let queue replicators find us.
@@ -163,10 +163,14 @@ void Primary::closed(broker::Connection& connection) {
BrokerInfo info;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
haBroker.getMembership().remove(info.getSystemId());
- QPID_LOG(debug, "HA primary: Backup disconnected: " << info);
- backups.erase(info.getSystemId());
- // FIXME aconway 2012-06-01: changes to expected backup set for unready queues.
+ QPID_LOG(debug, logPrefix << "Backup disconnected: " << info);
}
+ // NOTE: we do not modify backups here, we only add to the known backups set
+ // we never remove from it.
+
+ // It is possible for a backup connection to be rejected while we are a backup,
+ // but the closed is seen when we have become primary. Removing the entry
+ // from backups in this case would be incorrect.
}
diff --git a/cpp/src/qpid/ha/Primary.h b/cpp/src/qpid/ha/Primary.h
index 12b5073747..3187d80438 100644
--- a/cpp/src/qpid/ha/Primary.h
+++ b/cpp/src/qpid/ha/Primary.h
@@ -86,6 +86,11 @@ class Primary
std::string logPrefix;
bool active;
BackupSet initialBackups;
+ /**
+ * Backups is a map of all the remote backups we know about: any expected
+ * backups plus all actual backups that have connected. We do not remove
+ * entries when a backup disconnects. @see Primary::closed()
+ */
BackupMap backups;
boost::shared_ptr<broker::ConnectionObserver> connectionObserver;
boost::shared_ptr<broker::ConfigurationObserver> configurationObserver;
diff --git a/cpp/src/qpid/ha/QueueGuard.cpp b/cpp/src/qpid/ha/QueueGuard.cpp
index 40262a180c..93694af671 100644
--- a/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/cpp/src/qpid/ha/QueueGuard.cpp
@@ -52,13 +52,13 @@ QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
: queue(q), subscription(0)
{
std::ostringstream os;
- os << "HA primary guard " << queue.getName() << "@" << info.getLogId() << ": ";
+ os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": ";
logPrefix = os.str();
observer.reset(new QueueObserver(*this));
// Once we call addObserver we can get calls to enqueued and dequeued
queue.addObserver(observer);
// Must set after addObserver so we don't miss any enqueues.
- firstSafe = queue.getPosition()+1; // Next message will be safe.
+ firstSafe = queue.getPosition(); // FIXME aconway 2012-06-13: fencepost error
}
QueueGuard::~QueueGuard() { cancel(); }
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index af987a1e5e..c696050c12 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -63,12 +63,11 @@ QueueReplicator::QueueReplicator(const BrokerInfo& info,
boost::shared_ptr<Queue> q,
boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
- logPrefix("HA backup of "+q->getName()+": "),
+ logPrefix("Backup queue "+q->getName()+": "),
queue(q), link(l), brokerInfo(info)
{
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
- QPID_LOG(info, logPrefix << "Created");
}
// This must be separate from the constructor so we can call shared_from_this.
@@ -128,7 +127,11 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
// FIXME aconway 2012-05-22: use a finite credit window
peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
- QPID_LOG(debug, logPrefix << "Subscribed: " << bridgeName);
+
+ qpid::Address primary;
+ link->getRemoteAddress(primary);
+ QPID_LOG(info, logPrefix << "Connected to " << primary << "(" << bridgeName << ")");
+ QPID_LOG(trace, logPrefix << "Subscription settings: " << settings);
}
namespace {
diff --git a/cpp/src/qpid/ha/RemoteBackup.cpp b/cpp/src/qpid/ha/RemoteBackup.cpp
index 4f79537113..bcf7be2297 100644
--- a/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -32,7 +32,7 @@ using sys::Mutex;
RemoteBackup::RemoteBackup(
const BrokerInfo& info, broker::Broker& broker, ReplicationTest rt, bool cg) :
- logPrefix("HA primary, backup to "+info.getLogId()+": "), brokerInfo(info), replicationTest(rt),
+ logPrefix("Primary remote backup "+info.getLogId()+": "), brokerInfo(info), replicationTest(rt),
createGuards(cg)
{
QPID_LOG(debug, logPrefix << "Guarding queues for backup broker.");
@@ -85,11 +85,13 @@ void RemoteBackup::ready(const QueuePtr& q) {
if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready");
}
+// Called via ConfigurationObserver
void RemoteBackup::queueCreate(const QueuePtr& q) {
if (createGuards && replicationTest.isReplicated(ALL, *q))
guards[q].reset(new QueueGuard(*q, brokerInfo));
}
+// Called via ConfigurationObserver
void RemoteBackup::queueDestroy(const QueuePtr& q) {
initialQueues.erase(q);
GuardMap::iterator i = guards.find(q);
diff --git a/cpp/src/qpid/ha/RemoteBackup.h b/cpp/src/qpid/ha/RemoteBackup.h
index 37177ec562..f2e46c8042 100644
--- a/cpp/src/qpid/ha/RemoteBackup.h
+++ b/cpp/src/qpid/ha/RemoteBackup.h
@@ -51,20 +51,25 @@ class RemoteBackup
typedef boost::shared_ptr<QueueGuard> GuardPtr;
typedef boost::shared_ptr<broker::Queue> QueuePtr;
+ /** Note: isReady() can be true after construction */
RemoteBackup(const BrokerInfo& info, broker::Broker&, ReplicationTest rt, bool createGuards);
~RemoteBackup();
/** Return guard associated with a queue. Used to create ReplicatingSubscription. */
GuardPtr guard(const QueuePtr&);
- /** ReplicatingSubscription associated with queue is ready. */
+ /** ReplicatingSubscription associated with queue is ready.
+ * Note: may set isReady()
+ */
void ready(const QueuePtr& queue);
- // Called ConfigurationObserver
+ /** Called via ConfigurationObserver */
void queueCreate(const QueuePtr&);
+
+ /** Called via ConfigurationObserver. Note: may set isReady() */
void queueDestroy(const QueuePtr&);
- /**@return true when all initial queues for this backup are ready */
+ /**@return true when all initial queues for this backup are ready. */
bool isReady();
BrokerInfo getBrokerInfo() const { return brokerInfo; }
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 08f6fb7dcc..8213d6f5d5 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -201,7 +201,7 @@ ReplicatingSubscription::ReplicatingSubscription(
// Set a log prefix message that identifies the remote broker.
ostringstream os;
- os << "HA primary replica " << queue->getName() << "@" << info.getLogId() << ": ";
+ os << "Primary " << queue->getName() << "@" << info.getLogId() << ": ";
logPrefix = os.str();
// FIXME aconway 2012-06-10: unsafe to rely in queue front or position they are changing?
@@ -217,7 +217,7 @@ ReplicatingSubscription::ReplicatingSubscription(
// However we must attach the guard _before_ we scan for
// initial dequeues to be sure we don't miss any dequeues
// between the scan and attaching the guard.
- if (Primary::get()) guard = Primary::get()->getGuard(queue, getBrokerInfo());
+ if (Primary::get()) guard = Primary::get()->getGuard(queue, info);
if (!guard) guard.reset(new QueueGuard(*queue, getBrokerInfo()));
guard->attach(*this);