summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-05-22 18:11:07 +0000
committerAlan Conway <aconway@apache.org>2012-05-22 18:11:07 +0000
commite9cf88573a6532010a70f28fec8869bc034fe16b (patch)
tree7cc00269c3a5a772ea119aedc5ae23720f26ee75 /qpid/cpp/src
parent1cba9546cc2c521d6ee4099bed9ff1eb0d09afd5 (diff)
downloadqpid-python-e9cf88573a6532010a70f28fec8869bc034fe16b.tar.gz
QPID-3603: HA backups pass identifying info to primary.
Pass hostname, management UUID and status in link connection arguments. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1341580 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/CMakeLists.txt2
-rw-r--r--qpid/cpp/src/ha.mk2
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h3
-rw-r--r--qpid/cpp/src/qpid/broker/System.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/System.h17
-rw-r--r--qpid/cpp/src/qpid/framing/Uuid.cpp7
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp6
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.cpp59
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.h62
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp20
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp37
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h5
-rw-r--r--qpid/cpp/src/tests/Uuid.cpp5
13 files changed, 207 insertions, 28 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 8e4e9dae34..2ccb52e5de 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -628,6 +628,8 @@ if (BUILD_HA)
set (ha_SOURCES
qpid/ha/Backup.cpp
qpid/ha/Backup.h
+ qpid/ha/BrokerInfo.h
+ qpid/ha/BrokerInfo.cpp
qpid/ha/BrokerReplicator.cpp
qpid/ha/BrokerReplicator.h
qpid/ha/ConnectionExcluder.cpp
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index 31f7bcc494..ae7b803b87 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -25,6 +25,8 @@ dmoduleexec_LTLIBRARIES += ha.la
ha_la_SOURCES = \
qpid/ha/Backup.cpp \
qpid/ha/Backup.h \
+ qpid/ha/BrokerInfo.h \
+ qpid/ha/BrokerInfo.cpp \
qpid/ha/BrokerReplicator.cpp \
qpid/ha/BrokerReplicator.h \
qpid/ha/ConnectionExcluder.cpp \
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 72ed05aacb..c57e3c849c 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -384,6 +384,9 @@ class Broker : public sys::Runnable, public Plugin::Target,
/** Properties to be set on outgoing link connections */
QPID_BROKER_EXTERN framing::FieldTable getLinkClientProperties() const;
QPID_BROKER_EXTERN void setLinkClientProperties(const framing::FieldTable&);
+
+ /** Information identifying this system */
+ boost::shared_ptr<const System> getSystem() const { return systemObject; }
};
}}
diff --git a/qpid/cpp/src/qpid/broker/System.cpp b/qpid/cpp/src/qpid/broker/System.cpp
index 8cd2edda76..fa8df6406b 100644
--- a/qpid/cpp/src/qpid/broker/System.cpp
+++ b/qpid/cpp/src/qpid/broker/System.cpp
@@ -37,7 +37,6 @@ System::System (string _dataDir, Broker* broker) : mgmtObject(0)
if (agent != 0)
{
- framing::Uuid systemId;
if (_dataDir.empty ())
{
@@ -66,14 +65,13 @@ System::System (string _dataDir, Broker* broker) : mgmtObject(0)
}
mgmtObject = new _qmf::System(agent, this, types::Uuid(systemId.c_array()));
- std::string sysname, nodename, release, version, machine;
- qpid::sys::SystemInfo::getSystemId (sysname,
- nodename,
+ qpid::sys::SystemInfo::getSystemId (osName,
+ nodeName,
release,
version,
machine);
- mgmtObject->set_osName (sysname);
- mgmtObject->set_nodeName (nodename);
+ mgmtObject->set_osName (osName);
+ mgmtObject->set_nodeName (nodeName);
mgmtObject->set_release (release);
mgmtObject->set_version (version);
mgmtObject->set_machine (machine);
diff --git a/qpid/cpp/src/qpid/broker/System.h b/qpid/cpp/src/qpid/broker/System.h
index 0fc2c2bd88..6847c662ae 100644
--- a/qpid/cpp/src/qpid/broker/System.h
+++ b/qpid/cpp/src/qpid/broker/System.h
@@ -21,6 +21,7 @@
//
#include "qpid/management/Manageable.h"
+#include "qpid/framing/Uuid.h"
#include "qmf/org/apache/qpid/broker/System.h"
#include <boost/shared_ptr.hpp>
#include <string>
@@ -35,6 +36,8 @@ class System : public management::Manageable
private:
qmf::org::apache::qpid::broker::System* mgmtObject;
+ framing::Uuid systemId;
+ std::string osName, nodeName, release, version, machine;
public:
@@ -44,6 +47,20 @@ class System : public management::Manageable
management::ManagementObject* GetManagementObject (void) const
{ return mgmtObject; }
+
+
+ /** Persistent UUID assigned by the management system to this broker. */
+ framing::Uuid getSystemId() const { return systemId; }
+ /** Returns the OS name; e.g., GNU/Linux or Windows */
+ std::string getOsName() const { return osName; }
+ /** Returns the node name. Usually the same as the host name. */
+ std::string getNodeName() const { return nodeName; }
+ /** Returns the OS release identifier. */
+ std::string getRelease() const { return release; }
+ /** Returns the OS release version (kernel, build, sp, etc.) */
+ std::string getVersion() const { return version; }
+ /** Returns the hardware type. */
+ std::string getMachine() const { return machine; }
};
}}
diff --git a/qpid/cpp/src/qpid/framing/Uuid.cpp b/qpid/cpp/src/qpid/framing/Uuid.cpp
index b3d1e2e1e4..e94e28ffee 100644
--- a/qpid/cpp/src/qpid/framing/Uuid.cpp
+++ b/qpid/cpp/src/qpid/framing/Uuid.cpp
@@ -43,6 +43,13 @@ Uuid::Uuid(const uint8_t* data) {
assign(data);
}
+Uuid::Uuid(const std::string& s) {
+ if (s.size() != UNPARSED_SIZE)
+ throw IllegalArgumentException(QPID_MSG("Invalid UUID: " << s));
+ if (uuid_parse(&s[0], c_array()) != 0)
+ throw IllegalArgumentException(QPID_MSG("Invalid UUID: " << s));
+}
+
void Uuid::assign(const uint8_t* data) {
// This const cast is for Solaris which has a
// uuid_copy that takes a non const 2nd argument
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 8b62f89f11..f00d8a8f4c 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -61,14 +61,14 @@ Url Backup::linkUrl(const Url& brokers) const {
Url url;
for (Url::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
if (!isSelf(*i)) url.push_back(*i);
- if (url.empty()) throw Url::Invalid("HA broker Link URL is empty");
- QPID_LOG(debug, logPrefix << "Link URL set to: " << url);
+ if (url.empty()) throw Url::Invalid("HA Backup failover URL is empty");
+ QPID_LOG(debug, logPrefix << "Backup failover URL (excluding self): " << url);
return url;
}
void Backup::initialize(const Url& brokers) {
if (brokers.empty()) throw Url::Invalid("HA broker URL is empty");
- QPID_LOG(info, logPrefix << "Backup initialized with broker URL: " << brokers);
+ QPID_LOG(info, logPrefix << "Backup broker URL: " << brokers);
sys::Mutex::ScopedLock l(lock);
Url url = linkUrl(brokers);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
new file mode 100644
index 0000000000..59c58a68b5
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BrokerInfo.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
+#include <iostream>
+
+
+namespace qpid {
+namespace ha {
+
+namespace {
+std::string SYSTEM_ID="system-id";
+std::string HOST_NAME="host-name";
+std::string STATUS="status";
+}
+
+using framing::Uuid;
+using framing::FieldTable;
+
+FieldTable BrokerInfo::asFieldTable() const {
+ FieldTable ft;
+ ft.setString(SYSTEM_ID, systemId.str());
+ ft.setString(HOST_NAME, hostName);
+ ft.setInt(STATUS, status);
+ return ft;
+}
+
+void BrokerInfo::assign(const FieldTable& ft) {
+ systemId = Uuid(ft.getAsString(SYSTEM_ID));
+ hostName = ft.getAsString(HOST_NAME);
+ status = BrokerStatus(ft.getAsInt(STATUS));
+}
+
+std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) {
+ return o << b.getHostName() << "(" << b.getSystemId()
+ << "," << printable(b.getStatus()) << ")";
+}
+
+}}
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.h b/qpid/cpp/src/qpid/ha/BrokerInfo.h
new file mode 100644
index 0000000000..d72b6793ff
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.h
@@ -0,0 +1,62 @@
+#ifndef QPID_HA_BROKERINFO_H
+#define QPID_HA_BROKERINFO_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Enum.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/framing/FieldTable.h"
+#include <string>
+#include <iosfwd>
+
+namespace qpid {
+namespace ha {
+
+/**
+ * Information about a cluster broker, maintained by the cluster primary.
+ */
+class BrokerInfo
+{
+ public:
+ BrokerInfo(const std::string& host, const framing::Uuid& id) :
+ hostName(host), systemId(id) {}
+
+ BrokerInfo(const framing::FieldTable& ft) { assign(ft); }
+ framing::FieldTable asFieldTable() const;
+ void assign(const framing::FieldTable&);
+
+ framing::Uuid getSystemId() const { return systemId; }
+ std::string getHostName() const { return hostName; }
+ BrokerStatus getStatus() const { return status; }
+ void setStatus(BrokerStatus s) { status = s; }
+
+ private:
+ std::string hostName;
+ framing::Uuid systemId;
+ BrokerStatus status;
+};
+
+std::ostream& operator<<(std::ostream&, const BrokerInfo&);
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_BROKERINFO_H*/
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
index fef4c67174..6c413982f8 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
+++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
@@ -20,6 +20,8 @@
*/
#include "ConnectionExcluder.h"
+#include "BrokerInfo.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Connection.h"
#include <boost/function.hpp>
#include <sstream>
@@ -37,18 +39,16 @@ void ConnectionExcluder::opened(broker::Connection& connection) {
<< connection.getMgmtId());
return;
}
- if (connection.getClientProperties().isSet(BACKUP_TAG)) {
- if (backupAllowed) {
- QPID_LOG(debug, logPrefix << "Allowing backup connection: "
- << connection.getMgmtId());
- return;
- }
- else QPID_LOG(debug, logPrefix << "Rejected backup connection: "
- << connection.getMgmtId());
+ framing::FieldTable ft;
+ if (connection.getClientProperties().getTable(BACKUP_TAG, ft)) {
+ BrokerInfo info(ft);
+ QPID_LOG(debug, logPrefix << "Backup connection " << info <<
+ (backupAllowed ? " allowed" : " rejected"));
+ if (backupAllowed) return;
}
-
+ // Abort the connection.
throw Exception(
- QPID_MSG(logPrefix << "Rejected client connection " << connection.getMgmtId()));
+ QPID_MSG(logPrefix << "Rejected connection " << connection.getMgmtId()));
}
const std::string ConnectionExcluder::ADMIN_TAG="qpid.ha-admin";
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 9caa96a607..ecaf968053 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -24,6 +24,7 @@
#include "Primary.h"
#include "Settings.h"
#include "ReplicatingSubscription.h"
+#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
@@ -31,6 +32,7 @@
#include "qpid/broker/SignalHandler.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/management/ManagementAgent.h"
+#include "qpid/sys/SystemInfo.h"
#include "qmf/org/apache/qpid/ha/Package.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokers.h"
@@ -51,7 +53,10 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
settings(s),
mgmtObject(0),
status(STANDALONE),
- excluder(new ConnectionExcluder(logPrefix))
+ excluder(new ConnectionExcluder(logPrefix)),
+ brokerInfo(broker.getSystem()->getNodeName(),
+ broker.getSystem()->getSystemId())
+
{
// Set up the management object.
ManagementAgent* ma = broker.getManagementAgent();
@@ -81,6 +86,8 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l);
if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
statusChanged(l);
+
+ QPID_LOG(notice, logPrefix << "Broker starting on " << brokerInfo);
}
HaBroker::~HaBroker() {}
@@ -267,15 +274,27 @@ void HaBroker::setStatus(BrokerStatus newStatus, sys::Mutex::ScopedLock& l) {
statusChanged(l);
}
-void HaBroker::statusChanged(sys::Mutex::ScopedLock&) {
+void HaBroker::statusChanged(sys::Mutex::ScopedLock& l) {
mgmtObject->set_status(printable(status).str());
- // Set the backup-related properties for newly created links.
- framing::FieldTable ft = broker.getLinkClientProperties();
- if (isBackup(status))
- ft.setInt(ConnectionExcluder::BACKUP_TAG, 1);
- else
- ft.erase(ConnectionExcluder::BACKUP_TAG);
- broker.setLinkClientProperties(ft);
+ brokerInfo.setStatus(status);
+ setLinkProperties(l);
+}
+
+void HaBroker::setLinkProperties(sys::Mutex::ScopedLock&) {
+ framing::FieldTable linkProperties = broker.getLinkClientProperties();
+ if (isBackup(status)) {
+ // If this is a backup then any links we make are backup links
+ // and need to be tagged.
+ QPID_LOG(debug, logPrefix << "Backup setting info for outgoing links: " << brokerInfo);
+ linkProperties.setTable(ConnectionExcluder::BACKUP_TAG, brokerInfo.asFieldTable());
+ }
+ else {
+ // If this is a primary then any links are federation links
+ // and should not be tagged.
+ QPID_LOG(debug, logPrefix << "Primary removing backup info for outgoing links");
+ linkProperties.erase(ConnectionExcluder::BACKUP_TAG);
+ }
+ broker.setLinkClientProperties(linkProperties);
}
void HaBroker::activatedBackup(const std::string& queue) {
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index ebd4836e71..017ccefd27 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -22,6 +22,7 @@
*
*/
+#include "BrokerInfo.h"
#include "Enum.h"
#include "LogPrefix.h"
#include "Settings.h"
@@ -89,6 +90,8 @@ class HaBroker : public management::Manageable
boost::shared_ptr<ConnectionExcluder> getExcluder() { return excluder; }
+ const BrokerInfo& getBrokerInfo() const { return brokerInfo; }
+
private:
void setClientUrl(const Url&, const sys::Mutex::ScopedLock&);
void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&);
@@ -101,6 +104,7 @@ class HaBroker : public management::Manageable
void recover(sys::Mutex::ScopedLock&);
void activate(sys::Mutex::ScopedLock&);
void statusChanged(sys::Mutex::ScopedLock&);
+ void setLinkProperties(sys::Mutex::ScopedLock&);
std::vector<Url> getKnownBrokers() const;
@@ -118,6 +122,7 @@ class HaBroker : public management::Manageable
BrokerStatus status;
QueueNames activeBackups;
boost::shared_ptr<ConnectionExcluder> excluder;
+ BrokerInfo brokerInfo;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/tests/Uuid.cpp b/qpid/cpp/src/tests/Uuid.cpp
index f85a297adc..d7d1f00e3c 100644
--- a/qpid/cpp/src/tests/Uuid.cpp
+++ b/qpid/cpp/src/tests/Uuid.cpp
@@ -52,6 +52,11 @@ boost::array<uint8_t, 16> sample = {{0x1b, 0x4e, 0x28, 0xba, 0x2f, 0xa1, 0x11,
const string sampleStr("1b4e28ba-2fa1-11d2-883f-b9a761bde3fb");
const string zeroStr("00000000-0000-0000-0000-000000000000");
+QPID_AUTO_TEST_CASE(testUuidStr) {
+ Uuid uuid(sampleStr);
+ BOOST_CHECK(uuid == sample);
+}
+
QPID_AUTO_TEST_CASE(testUuidIstream) {
Uuid uuid;
istringstream in(sampleStr);