diff options
| author | Alan Conway <aconway@apache.org> | 2012-06-14 00:47:39 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-06-14 00:47:39 +0000 |
| commit | 5fbda29971ec529d2fd147ffabdccb95eb4a6040 (patch) | |
| tree | 574c5d46385a19a790f34374b878cd9e5a6d2354 | |
| parent | 7a83a79c9367107cf9b8e550e00846c312e162ae (diff) | |
| download | qpid-python-5fbda29971ec529d2fd147ffabdccb95eb4a6040.tar.gz | |
QPID-3603: Bug fixes to HA code, passing test_failover_send_receive
- Updated HA logging messages to conform to new [Category] log format.
- QueueGuard fencepost error, set firstSafe correctly
- ReplicatingSubscription passing correct broker info to getGuard()
- Don't remove RemoteBackups on disconnect: fixes race where backup rejects connection but primary sees closed.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1350069 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | cpp/src/qpid/ha/Backup.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/BackupConnectionExcluder.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/BrokerReplicator.cpp | 37 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/ConnectionObserver.cpp | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/HaBroker.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/Primary.cpp | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/Primary.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/QueueGuard.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/QueueReplicator.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/RemoteBackup.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/RemoteBackup.h | 11 | ||||
| -rw-r--r-- | cpp/src/qpid/ha/ReplicatingSubscription.cpp | 4 |
12 files changed, 66 insertions, 39 deletions
diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp index 1c48d1a4f1..8ed64c6767 100644 --- a/cpp/src/qpid/ha/Backup.cpp +++ b/cpp/src/qpid/ha/Backup.cpp @@ -45,7 +45,7 @@ using types::Variant; using std::string; Backup::Backup(HaBroker& hb, const Settings& s) : - logPrefix("HA backup: "), haBroker(hb), broker(hb.getBroker()), settings(s) + logPrefix("Backup: "), haBroker(hb), broker(hb.getBroker()), settings(s) { // Empty brokerUrl means delay initialization until seBrokertUrl() is called. if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl)); diff --git a/cpp/src/qpid/ha/BackupConnectionExcluder.h b/cpp/src/qpid/ha/BackupConnectionExcluder.h index 6fd3a3ae09..0c66597961 100644 --- a/cpp/src/qpid/ha/BackupConnectionExcluder.h +++ b/cpp/src/qpid/ha/BackupConnectionExcluder.h @@ -24,6 +24,7 @@ #include "qpid/broker/ConnectionObserver.h" #include "qpid/broker/Connection.h" +#include "qpid/log/Statement.h" namespace qpid { namespace ha { @@ -36,7 +37,9 @@ class BackupConnectionExcluder : public broker::ConnectionObserver { public: void opened(broker::Connection& connection) { - throw Exception("HA backup rejected connection "+connection.getMgmtId()); + // FIXME aconway 2012-06-13: suppress caught error message, make this an info message. + QPID_LOG(error, "Backup broker rejected connection "+connection.getMgmtId()); + throw Exception("Backup broker rejected connection "+connection.getMgmtId()); } void closed(broker::Connection&) {} diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index a99a8a3b91..d510d13b31 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -170,7 +170,7 @@ Variant::Map asMapVoid(const Variant& value) { BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l) : Exchange(QPID_CONFIGURATION_REPLICATOR), - logPrefix("HA backup: "), replicationTest(hb.getReplicationTest()), + logPrefix("Backup configuration: "), replicationTest(hb.getReplicationTest()), haBroker(hb), broker(hb.getBroker()), link(l) {} @@ -243,7 +243,9 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler); sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler); sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler); - QPID_LOG(debug, logPrefix << "Opened configuration bridge: " << queueName); + qpid::Address primary; + link->getRemoteAddress(primary); + QPID_LOG(info, logPrefix << "Connected to " << primary << "(" << queueName << ")"); } void BrokerReplicator::route(Deliverable& msg) { @@ -320,7 +322,7 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { values[USER].asString(), values[RHOST].asString()); assert(result.second); // Should be true since we destroyed existing queue above - QPID_LOG(debug, logPrefix << "Queue declare event: " << name); + QPID_LOG(debug, logPrefix << "Queue declare event, starting replication: " << name); startQueueReplicator(result.first); } } @@ -466,8 +468,10 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/); // It is normal for the queue to already exist if we are failing over. + QPID_LOG(debug, logPrefix << "Queue response, " + << (result.second ? "starting replication: " : "already replicated: ") + << name); if (result.second) startQueueReplicator(result.first); - QPID_LOG(debug, logPrefix << "Queue response: " << name); } void BrokerReplicator::doResponseExchange(Variant::Map& values) { @@ -475,20 +479,17 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { if (!replicationTest.replicateLevel(argsMap)) return; framing::FieldTable args; amqp_0_10::translate(argsMap, args); - if (broker.createExchange( - values[NAME].asString(), - values[TYPE].asString(), - values[DURABLE].asBool(), - ""/*TODO: need to include alternate-exchange*/, - args, - ""/*TODO: who is the user?*/, - ""/*TODO: what should we use as connection id?*/).second) - { - QPID_LOG(debug, logPrefix << "Exchange response: " << values[NAME].asString()); - } else { - QPID_LOG(warning, logPrefix << "Exchange response, already exists: " << - values[NAME].asString()); - } + bool created = broker.createExchange( + values[NAME].asString(), + values[TYPE].asString(), + values[DURABLE].asBool(), + ""/*TODO: need to include alternate-exchange*/, + args, + ""/*TODO: who is the user?*/, + ""/*TODO: what should we use as connection id?*/).second; + QPID_LOG(debug, logPrefix << "Exchange response, " + << (created ? "created replica: " : "already exists: ") + << values[NAME].asString()); } namespace { diff --git a/cpp/src/qpid/ha/ConnectionObserver.cpp b/cpp/src/qpid/ha/ConnectionObserver.cpp index 234a55ee2c..f666aa85a6 100644 --- a/cpp/src/qpid/ha/ConnectionObserver.cpp +++ b/cpp/src/qpid/ha/ConnectionObserver.cpp @@ -30,7 +30,7 @@ namespace qpid { namespace ha { ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid) - : haBroker(hb), logPrefix("HA connections: "), self(uuid) {} + : haBroker(hb), logPrefix("Connections: "), self(uuid) {} // FIXME aconway 2012-06-06: move to BrokerInfo bool ConnectionObserver::getBrokerInfo(broker::Connection& connection, BrokerInfo& info) { @@ -61,8 +61,12 @@ void ConnectionObserver::opened(broker::Connection& connection) { } BrokerInfo info; // Avoid self connections. if (getBrokerInfo(connection, info)) { - if (info.getSystemId() == self) - throw Exception(QPID_MSG(logPrefix << "Rejected connection from self")); + if (info.getSystemId() == self) { + // FIXME aconway 2012-06-13: suppress caught error message, make this an info message. + QPID_LOG(error, "HA broker rejected self connection "+connection.getMgmtId()); + throw Exception("HA broker rejected self connection "+connection.getMgmtId()); + } + } ObserverPtr o(getObserver()); if (o) o->opened(connection); diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp index 9a359b75e7..dd30dd08d4 100644 --- a/cpp/src/qpid/ha/HaBroker.cpp +++ b/cpp/src/qpid/ha/HaBroker.cpp @@ -55,7 +55,7 @@ using types::Uuid; using sys::Mutex; HaBroker::HaBroker(broker::Broker& b, const Settings& s) - : logPrefix("HA: "), + : logPrefix("Broker: "), broker(b), systemId(broker.getSystem()->getSystemId().data()), settings(s), diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp index f4709511c8..57231032ff 100644 --- a/cpp/src/qpid/ha/Primary.cpp +++ b/cpp/src/qpid/ha/Primary.cpp @@ -64,7 +64,7 @@ class PrimaryConfigurationObserver : public broker::ConfigurationObserver Primary* Primary::instance = 0; Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : - haBroker(hb), logPrefix("HA primary: "), active(false) + haBroker(hb), logPrefix("Primary: "), active(false) { assert(instance == 0); instance = this; // Let queue replicators find us. @@ -163,10 +163,14 @@ void Primary::closed(broker::Connection& connection) { BrokerInfo info; if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { haBroker.getMembership().remove(info.getSystemId()); - QPID_LOG(debug, "HA primary: Backup disconnected: " << info); - backups.erase(info.getSystemId()); - // FIXME aconway 2012-06-01: changes to expected backup set for unready queues. + QPID_LOG(debug, logPrefix << "Backup disconnected: " << info); } + // NOTE: we do not modify backups here, we only add to the known backups set + // we never remove from it. + + // It is possible for a backup connection to be rejected while we are a backup, + // but the closed is seen when we have become primary. Removing the entry + // from backups in this case would be incorrect. } diff --git a/cpp/src/qpid/ha/Primary.h b/cpp/src/qpid/ha/Primary.h index 12b5073747..3187d80438 100644 --- a/cpp/src/qpid/ha/Primary.h +++ b/cpp/src/qpid/ha/Primary.h @@ -86,6 +86,11 @@ class Primary std::string logPrefix; bool active; BackupSet initialBackups; + /** + * Backups is a map of all the remote backups we know about: any expected + * backups plus all actual backups that have connected. We do not remove + * entries when a backup disconnects. @see Primary::closed() + */ BackupMap backups; boost::shared_ptr<broker::ConnectionObserver> connectionObserver; boost::shared_ptr<broker::ConfigurationObserver> configurationObserver; diff --git a/cpp/src/qpid/ha/QueueGuard.cpp b/cpp/src/qpid/ha/QueueGuard.cpp index 40262a180c..93694af671 100644 --- a/cpp/src/qpid/ha/QueueGuard.cpp +++ b/cpp/src/qpid/ha/QueueGuard.cpp @@ -52,13 +52,13 @@ QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) : queue(q), subscription(0) { std::ostringstream os; - os << "HA primary guard " << queue.getName() << "@" << info.getLogId() << ": "; + os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": "; logPrefix = os.str(); observer.reset(new QueueObserver(*this)); // Once we call addObserver we can get calls to enqueued and dequeued queue.addObserver(observer); // Must set after addObserver so we don't miss any enqueues. - firstSafe = queue.getPosition()+1; // Next message will be safe. + firstSafe = queue.getPosition(); // FIXME aconway 2012-06-13: fencepost error } QueueGuard::~QueueGuard() { cancel(); } diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index af987a1e5e..c696050c12 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -63,12 +63,11 @@ QueueReplicator::QueueReplicator(const BrokerInfo& info, boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), - logPrefix("HA backup of "+q->getName()+": "), + logPrefix("Backup queue "+q->getName()+": "), queue(q), link(l), brokerInfo(info) { Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); - QPID_LOG(info, logPrefix << "Created"); } // This must be separate from the constructor so we can call shared_from_this. @@ -128,7 +127,11 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa // FIXME aconway 2012-05-22: use a finite credit window peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); - QPID_LOG(debug, logPrefix << "Subscribed: " << bridgeName); + + qpid::Address primary; + link->getRemoteAddress(primary); + QPID_LOG(info, logPrefix << "Connected to " << primary << "(" << bridgeName << ")"); + QPID_LOG(trace, logPrefix << "Subscription settings: " << settings); } namespace { diff --git a/cpp/src/qpid/ha/RemoteBackup.cpp b/cpp/src/qpid/ha/RemoteBackup.cpp index 4f79537113..bcf7be2297 100644 --- a/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/cpp/src/qpid/ha/RemoteBackup.cpp @@ -32,7 +32,7 @@ using sys::Mutex; RemoteBackup::RemoteBackup( const BrokerInfo& info, broker::Broker& broker, ReplicationTest rt, bool cg) : - logPrefix("HA primary, backup to "+info.getLogId()+": "), brokerInfo(info), replicationTest(rt), + logPrefix("Primary remote backup "+info.getLogId()+": "), brokerInfo(info), replicationTest(rt), createGuards(cg) { QPID_LOG(debug, logPrefix << "Guarding queues for backup broker."); @@ -85,11 +85,13 @@ void RemoteBackup::ready(const QueuePtr& q) { if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready"); } +// Called via ConfigurationObserver void RemoteBackup::queueCreate(const QueuePtr& q) { if (createGuards && replicationTest.isReplicated(ALL, *q)) guards[q].reset(new QueueGuard(*q, brokerInfo)); } +// Called via ConfigurationObserver void RemoteBackup::queueDestroy(const QueuePtr& q) { initialQueues.erase(q); GuardMap::iterator i = guards.find(q); diff --git a/cpp/src/qpid/ha/RemoteBackup.h b/cpp/src/qpid/ha/RemoteBackup.h index 37177ec562..f2e46c8042 100644 --- a/cpp/src/qpid/ha/RemoteBackup.h +++ b/cpp/src/qpid/ha/RemoteBackup.h @@ -51,20 +51,25 @@ class RemoteBackup typedef boost::shared_ptr<QueueGuard> GuardPtr; typedef boost::shared_ptr<broker::Queue> QueuePtr; + /** Note: isReady() can be true after construction */ RemoteBackup(const BrokerInfo& info, broker::Broker&, ReplicationTest rt, bool createGuards); ~RemoteBackup(); /** Return guard associated with a queue. Used to create ReplicatingSubscription. */ GuardPtr guard(const QueuePtr&); - /** ReplicatingSubscription associated with queue is ready. */ + /** ReplicatingSubscription associated with queue is ready. + * Note: may set isReady() + */ void ready(const QueuePtr& queue); - // Called ConfigurationObserver + /** Called via ConfigurationObserver */ void queueCreate(const QueuePtr&); + + /** Called via ConfigurationObserver. Note: may set isReady() */ void queueDestroy(const QueuePtr&); - /**@return true when all initial queues for this backup are ready */ + /**@return true when all initial queues for this backup are ready. */ bool isReady(); BrokerInfo getBrokerInfo() const { return brokerInfo; } diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 08f6fb7dcc..8213d6f5d5 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -201,7 +201,7 @@ ReplicatingSubscription::ReplicatingSubscription( // Set a log prefix message that identifies the remote broker. ostringstream os; - os << "HA primary replica " << queue->getName() << "@" << info.getLogId() << ": "; + os << "Primary " << queue->getName() << "@" << info.getLogId() << ": "; logPrefix = os.str(); // FIXME aconway 2012-06-10: unsafe to rely in queue front or position they are changing? @@ -217,7 +217,7 @@ ReplicatingSubscription::ReplicatingSubscription( // However we must attach the guard _before_ we scan for // initial dequeues to be sure we don't miss any dequeues // between the scan and attaching the guard. - if (Primary::get()) guard = Primary::get()->getGuard(queue, getBrokerInfo()); + if (Primary::get()) guard = Primary::get()->getGuard(queue, info); if (!guard) guard.reset(new QueueGuard(*queue, getBrokerInfo())); guard->attach(*this); |
