summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-05-11 14:39:58 +0000
committerAlan Conway <aconway@apache.org>2010-05-11 14:39:58 +0000
commit16da0e0c511c0c1cf4ea592640c522754065200a (patch)
tree59fb80994db731fabe19897c95a6912e68716360 /cpp/src/qpid/broker
parentbe8e1bf6b6a0d760bddbbe6642d477a95f36ab42 (diff)
downloadqpid-python-16da0e0c511c0c1cf4ea592640c522754065200a.tar.gz
Support for multiple protocols in qpid::Url.
- simplified qpid::Address to hold (protocol,host,port) triples. - protocol plugins call Url:addProtocol to add tags to Url parser. - use Address::protocol when establishing connections. - ssl_test: tests using URL to connect. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@943130 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp5
-rw-r--r--cpp/src/qpid/broker/Link.cpp12
-rw-r--r--cpp/src/qpid/broker/Link.h2
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp48
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.h9
-rw-r--r--cpp/src/qpid/broker/RetryList.cpp12
-rw-r--r--cpp/src/qpid/broker/RetryList.h2
-rw-r--r--cpp/src/qpid/broker/windows/SslProtocolFactory.cpp2
8 files changed, 45 insertions, 47 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index df621fe4fb..09157c1e62 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -437,6 +437,7 @@ uint16_t Broker::getPort(const std::string& name) const {
void Broker::registerProtocolFactory(const std::string& name, ProtocolFactory::shared_ptr protocolFactory) {
protocolFactories[name] = protocolFactory;
+ Url::addProtocol(name);
}
void Broker::accept() {
@@ -461,8 +462,8 @@ void Broker::connect(
sys::ConnectionCodec::Factory* f)
{
url.throwIfEmpty();
- const TcpAddress* addr=url[0].get<TcpAddress>();
- connect(addr->host, addr->port, TCP_TRANSPORT, failed, f);
+ const Address& addr=url[0];
+ connect(addr.host, addr.port, addr.protocol, failed, f);
}
uint32_t Broker::queueMoveMessages(
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 6db6fe7637..5a1dfd9656 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -307,11 +307,12 @@ void Link::maintenanceVisit ()
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
-void Link::reconnect(const qpid::TcpAddress& a)
+void Link::reconnect(const qpid::Address& a)
{
Mutex::ScopedLock mutex(lock);
host = a.host;
port = a.port;
+ transport = a.protocol;
startConnectionLH();
if (mgmtObject != 0) {
stringstream errorString;
@@ -322,11 +323,10 @@ void Link::reconnect(const qpid::TcpAddress& a)
bool Link::tryFailover()
{
- //TODO: urls only work for TCP at present, update when that has changed
- TcpAddress next;
- if (transport == Broker::TCP_TRANSPORT && urls.next(next) &&
- (next.host != host || next.port != port)) {
- links->changeAddress(TcpAddress(host, port), next);
+ Address next;
+ if (urls.next(next) &&
+ (next.host != host || next.port != port || next.protocol != transport)) {
+ links->changeAddress(Address(transport, host, port), next);
QPID_LOG(debug, "Link failing over to " << host << ":" << port);
return true;
} else {
diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h
index 318eb5bd32..9da610076b 100644
--- a/cpp/src/qpid/broker/Link.h
+++ b/cpp/src/qpid/broker/Link.h
@@ -115,7 +115,7 @@ namespace qpid {
void established(); // Called when connection is created
void closed(int, std::string); // Called when connection goes away
void setConnection(Connection*); // Set pointer to the AMQP Connection
- void reconnect(const TcpAddress&); //called by LinkRegistry
+ void reconnect(const Address&); //called by LinkRegistry
string getAuthMechanism() { return authMechanism; }
string getUsername() { return username; }
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index f32587dd68..592b64449d 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -95,14 +95,14 @@ void LinkRegistry::periodicMaintenance ()
reMappings.clear();
}
-void LinkRegistry::changeAddress(const qpid::TcpAddress& oldAddress, const qpid::TcpAddress& newAddress)
+void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress)
{
//done on periodic maintenance thread; hold changes in separate
//map to avoid modifying the link map that is iterated over
reMappings[createKey(oldAddress)] = newAddress;
}
-bool LinkRegistry::updateAddress(const std::string& oldKey, const qpid::TcpAddress& newAddress)
+bool LinkRegistry::updateAddress(const std::string& oldKey, const qpid::Address& newAddress)
{
std::string newKey = createKey(newAddress);
if (links.find(newKey) != links.end()) {
@@ -133,9 +133,7 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host,
{
Mutex::ScopedLock locker(lock);
- stringstream keystream;
- keystream << host << ":" << port;
- string key = string(keystream.str());
+ string key = createKey(host, port);
LinkMap::iterator i = links.find(key);
if (i == links.end())
@@ -168,12 +166,10 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
Mutex::ScopedLock locker(lock);
QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")");
- stringstream keystream;
- keystream << host << ":" << port;
- string linkKey = string(keystream.str());
-
- keystream << "!" << src << "!" << dest << "!" << key;
- string bridgeKey = string(keystream.str());
+ string linkKey = createKey(host, port);
+ stringstream keystream;
+ keystream << linkKey << "!" << src << "!" << dest << "!" << key;
+ string bridgeKey = keystream.str();
LinkMap::iterator l = links.find(linkKey);
if (l == links.end())
@@ -210,9 +206,7 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
void LinkRegistry::destroy(const string& host, const uint16_t port)
{
Mutex::ScopedLock locker(lock);
- stringstream keystream;
- keystream << host << ":" << port;
- string key = string(keystream.str());
+ string key = createKey(host, port);
LinkMap::iterator i = links.find(key);
if (i != links.end())
@@ -231,16 +225,15 @@ void LinkRegistry::destroy(const std::string& host,
const std::string& key)
{
Mutex::ScopedLock locker(lock);
- stringstream keystream;
- keystream << host << ":" << port;
- string linkKey = string(keystream.str());
+ string linkKey = createKey(host, port);
+ stringstream keystream;
+ keystream << linkKey << "!" << src << "!" << dest << "!" << key;
+ string bridgeKey = keystream.str();
LinkMap::iterator l = links.find(linkKey);
if (l == links.end())
return;
- keystream << "!" << src << "!" << dest << "!" << key;
- string bridgeKey = string(keystream.str());
BridgeMap::iterator b = bridges.find(bridgeKey);
if (b == bridges.end())
return;
@@ -328,11 +321,18 @@ std::string LinkRegistry::getAuthIdentity(const std::string& key)
}
-std::string LinkRegistry::createKey(const qpid::TcpAddress& a)
-{
- stringstream keystream;
- keystream << a.host << ":" << a.port;
- return string(keystream.str());
+std::string LinkRegistry::createKey(const qpid::Address& a) {
+ // TODO aconway 2010-05-11: key should also include protocol/transport to
+ // be unique. Requires refactor of LinkRegistry interface.
+ return createKey(a.host, a.port);
+}
+
+std::string LinkRegistry::createKey(const std::string& host, uint16_t port) {
+ // TODO aconway 2010-05-11: key should also include protocol/transport to
+ // be unique. Requires refactor of LinkRegistry interface.
+ stringstream keystream;
+ keystream << host << ":" << port;
+ return keystream.str();
}
void LinkRegistry::setPassive(bool p)
diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h
index 09a89298b6..52ab700cfc 100644
--- a/cpp/src/qpid/broker/LinkRegistry.h
+++ b/cpp/src/qpid/broker/LinkRegistry.h
@@ -53,7 +53,7 @@ namespace broker {
typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap;
typedef std::map<std::string, Bridge::shared_ptr> BridgeMap;
- typedef std::map<std::string, TcpAddress> AddressMap;
+ typedef std::map<std::string, Address> AddressMap;
LinkMap links;
LinkMap linksToDestroy;
@@ -72,9 +72,10 @@ namespace broker {
std::string realm;
void periodicMaintenance ();
- bool updateAddress(const std::string& oldKey, const TcpAddress& newAddress);
+ bool updateAddress(const std::string& oldKey, const Address& newAddress);
boost::shared_ptr<Link> findLink(const std::string& key);
- static std::string createKey(const TcpAddress& address);
+ static std::string createKey(const Address& address);
+ static std::string createKey(const std::string& host, uint16_t port);
public:
LinkRegistry (); // Only used in store tests
@@ -135,7 +136,7 @@ namespace broker {
/**
* Called by links failing over to new address
*/
- void changeAddress(const TcpAddress& oldAddress, const TcpAddress& newAddress);
+ void changeAddress(const Address& oldAddress, const Address& newAddress);
/**
* Called to alter passive state. In passive state the links
* and bridges managed by a link registry will be recorded and
diff --git a/cpp/src/qpid/broker/RetryList.cpp b/cpp/src/qpid/broker/RetryList.cpp
index 8f600c086d..b0477dd0f7 100644
--- a/cpp/src/qpid/broker/RetryList.cpp
+++ b/cpp/src/qpid/broker/RetryList.cpp
@@ -31,20 +31,16 @@ void RetryList::reset(const std::vector<Url>& u)
urlIndex = addressIndex = 0;//reset indices
}
-bool RetryList::next(TcpAddress& address)
+bool RetryList::next(Address& address)
{
while (urlIndex < urls.size()) {
- while (addressIndex < urls[urlIndex].size()) {
- const TcpAddress* tcp = urls[urlIndex][addressIndex++].get<TcpAddress>();
- if (tcp) {
- address = *tcp;
- return true;
- }
+ if (addressIndex < urls[urlIndex].size()) {
+ address = urls[urlIndex][addressIndex++];
+ return true;
}
urlIndex++;
addressIndex = 0;
}
-
urlIndex = addressIndex = 0;//reset indices
return false;
}
diff --git a/cpp/src/qpid/broker/RetryList.h b/cpp/src/qpid/broker/RetryList.h
index f87adf2c8d..242a7d2122 100644
--- a/cpp/src/qpid/broker/RetryList.h
+++ b/cpp/src/qpid/broker/RetryList.h
@@ -38,7 +38,7 @@ class RetryList
public:
QPID_BROKER_EXTERN RetryList();
QPID_BROKER_EXTERN void reset(const std::vector<Url>& urls);
- QPID_BROKER_EXTERN bool next(TcpAddress& address);
+ QPID_BROKER_EXTERN bool next(Address& address);
private:
std::vector<Url> urls;
size_t urlIndex;
diff --git a/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp b/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
index 0db8fb5713..62122cbaa9 100644
--- a/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
+++ b/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
@@ -52,7 +52,7 @@ struct SslServerOptions : qpid::Options
SslServerOptions() : qpid::Options("SSL Options"),
certStore("My"), port(5671), clientAuth(false)
{
- qpid::TcpAddress me;
+ qpid::Address me;
if (qpid::sys::SystemInfo::getLocalHostname(me))
certName = me.host;
else