diff options
| author | Alan Conway <aconway@apache.org> | 2010-05-11 14:39:58 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2010-05-11 14:39:58 +0000 |
| commit | 16da0e0c511c0c1cf4ea592640c522754065200a (patch) | |
| tree | 59fb80994db731fabe19897c95a6912e68716360 /cpp/src/qpid/broker | |
| parent | be8e1bf6b6a0d760bddbbe6642d477a95f36ab42 (diff) | |
| download | qpid-python-16da0e0c511c0c1cf4ea592640c522754065200a.tar.gz | |
Support for multiple protocols in qpid::Url.
- simplified qpid::Address to hold (protocol,host,port) triples.
- protocol plugins call Url:addProtocol to add tags to Url parser.
- use Address::protocol when establishing connections.
- ssl_test: tests using URL to connect.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@943130 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Link.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.cpp | 48 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.h | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/RetryList.cpp | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/RetryList.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/windows/SslProtocolFactory.cpp | 2 |
8 files changed, 45 insertions, 47 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index df621fe4fb..09157c1e62 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -437,6 +437,7 @@ uint16_t Broker::getPort(const std::string& name) const { void Broker::registerProtocolFactory(const std::string& name, ProtocolFactory::shared_ptr protocolFactory) { protocolFactories[name] = protocolFactory; + Url::addProtocol(name); } void Broker::accept() { @@ -461,8 +462,8 @@ void Broker::connect( sys::ConnectionCodec::Factory* f) { url.throwIfEmpty(); - const TcpAddress* addr=url[0].get<TcpAddress>(); - connect(addr->host, addr->port, TCP_TRANSPORT, failed, f); + const Address& addr=url[0]; + connect(addr.host, addr.port, addr.protocol, failed, f); } uint32_t Broker::queueMoveMessages( diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 6db6fe7637..5a1dfd9656 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -307,11 +307,12 @@ void Link::maintenanceVisit () connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } -void Link::reconnect(const qpid::TcpAddress& a) +void Link::reconnect(const qpid::Address& a) { Mutex::ScopedLock mutex(lock); host = a.host; port = a.port; + transport = a.protocol; startConnectionLH(); if (mgmtObject != 0) { stringstream errorString; @@ -322,11 +323,10 @@ void Link::reconnect(const qpid::TcpAddress& a) bool Link::tryFailover() { - //TODO: urls only work for TCP at present, update when that has changed - TcpAddress next; - if (transport == Broker::TCP_TRANSPORT && urls.next(next) && - (next.host != host || next.port != port)) { - links->changeAddress(TcpAddress(host, port), next); + Address next; + if (urls.next(next) && + (next.host != host || next.port != port || next.protocol != transport)) { + links->changeAddress(Address(transport, host, port), next); QPID_LOG(debug, "Link failing over to " << host << ":" << port); return true; } else { diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h index 318eb5bd32..9da610076b 100644 --- a/cpp/src/qpid/broker/Link.h +++ b/cpp/src/qpid/broker/Link.h @@ -115,7 +115,7 @@ namespace qpid { void established(); // Called when connection is created void closed(int, std::string); // Called when connection goes away void setConnection(Connection*); // Set pointer to the AMQP Connection - void reconnect(const TcpAddress&); //called by LinkRegistry + void reconnect(const Address&); //called by LinkRegistry string getAuthMechanism() { return authMechanism; } string getUsername() { return username; } diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index f32587dd68..592b64449d 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -95,14 +95,14 @@ void LinkRegistry::periodicMaintenance () reMappings.clear(); } -void LinkRegistry::changeAddress(const qpid::TcpAddress& oldAddress, const qpid::TcpAddress& newAddress) +void LinkRegistry::changeAddress(const qpid::Address& oldAddress, const qpid::Address& newAddress) { //done on periodic maintenance thread; hold changes in separate //map to avoid modifying the link map that is iterated over reMappings[createKey(oldAddress)] = newAddress; } -bool LinkRegistry::updateAddress(const std::string& oldKey, const qpid::TcpAddress& newAddress) +bool LinkRegistry::updateAddress(const std::string& oldKey, const qpid::Address& newAddress) { std::string newKey = createKey(newAddress); if (links.find(newKey) != links.end()) { @@ -133,9 +133,7 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host, { Mutex::ScopedLock locker(lock); - stringstream keystream; - keystream << host << ":" << port; - string key = string(keystream.str()); + string key = createKey(host, port); LinkMap::iterator i = links.find(key); if (i == links.end()) @@ -168,12 +166,10 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, Mutex::ScopedLock locker(lock); QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")"); - stringstream keystream; - keystream << host << ":" << port; - string linkKey = string(keystream.str()); - - keystream << "!" << src << "!" << dest << "!" << key; - string bridgeKey = string(keystream.str()); + string linkKey = createKey(host, port); + stringstream keystream; + keystream << linkKey << "!" << src << "!" << dest << "!" << key; + string bridgeKey = keystream.str(); LinkMap::iterator l = links.find(linkKey); if (l == links.end()) @@ -210,9 +206,7 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, void LinkRegistry::destroy(const string& host, const uint16_t port) { Mutex::ScopedLock locker(lock); - stringstream keystream; - keystream << host << ":" << port; - string key = string(keystream.str()); + string key = createKey(host, port); LinkMap::iterator i = links.find(key); if (i != links.end()) @@ -231,16 +225,15 @@ void LinkRegistry::destroy(const std::string& host, const std::string& key) { Mutex::ScopedLock locker(lock); - stringstream keystream; - keystream << host << ":" << port; - string linkKey = string(keystream.str()); + string linkKey = createKey(host, port); + stringstream keystream; + keystream << linkKey << "!" << src << "!" << dest << "!" << key; + string bridgeKey = keystream.str(); LinkMap::iterator l = links.find(linkKey); if (l == links.end()) return; - keystream << "!" << src << "!" << dest << "!" << key; - string bridgeKey = string(keystream.str()); BridgeMap::iterator b = bridges.find(bridgeKey); if (b == bridges.end()) return; @@ -328,11 +321,18 @@ std::string LinkRegistry::getAuthIdentity(const std::string& key) } -std::string LinkRegistry::createKey(const qpid::TcpAddress& a) -{ - stringstream keystream; - keystream << a.host << ":" << a.port; - return string(keystream.str()); +std::string LinkRegistry::createKey(const qpid::Address& a) { + // TODO aconway 2010-05-11: key should also include protocol/transport to + // be unique. Requires refactor of LinkRegistry interface. + return createKey(a.host, a.port); +} + +std::string LinkRegistry::createKey(const std::string& host, uint16_t port) { + // TODO aconway 2010-05-11: key should also include protocol/transport to + // be unique. Requires refactor of LinkRegistry interface. + stringstream keystream; + keystream << host << ":" << port; + return keystream.str(); } void LinkRegistry::setPassive(bool p) diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h index 09a89298b6..52ab700cfc 100644 --- a/cpp/src/qpid/broker/LinkRegistry.h +++ b/cpp/src/qpid/broker/LinkRegistry.h @@ -53,7 +53,7 @@ namespace broker { typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap; typedef std::map<std::string, Bridge::shared_ptr> BridgeMap; - typedef std::map<std::string, TcpAddress> AddressMap; + typedef std::map<std::string, Address> AddressMap; LinkMap links; LinkMap linksToDestroy; @@ -72,9 +72,10 @@ namespace broker { std::string realm; void periodicMaintenance (); - bool updateAddress(const std::string& oldKey, const TcpAddress& newAddress); + bool updateAddress(const std::string& oldKey, const Address& newAddress); boost::shared_ptr<Link> findLink(const std::string& key); - static std::string createKey(const TcpAddress& address); + static std::string createKey(const Address& address); + static std::string createKey(const std::string& host, uint16_t port); public: LinkRegistry (); // Only used in store tests @@ -135,7 +136,7 @@ namespace broker { /** * Called by links failing over to new address */ - void changeAddress(const TcpAddress& oldAddress, const TcpAddress& newAddress); + void changeAddress(const Address& oldAddress, const Address& newAddress); /** * Called to alter passive state. In passive state the links * and bridges managed by a link registry will be recorded and diff --git a/cpp/src/qpid/broker/RetryList.cpp b/cpp/src/qpid/broker/RetryList.cpp index 8f600c086d..b0477dd0f7 100644 --- a/cpp/src/qpid/broker/RetryList.cpp +++ b/cpp/src/qpid/broker/RetryList.cpp @@ -31,20 +31,16 @@ void RetryList::reset(const std::vector<Url>& u) urlIndex = addressIndex = 0;//reset indices } -bool RetryList::next(TcpAddress& address) +bool RetryList::next(Address& address) { while (urlIndex < urls.size()) { - while (addressIndex < urls[urlIndex].size()) { - const TcpAddress* tcp = urls[urlIndex][addressIndex++].get<TcpAddress>(); - if (tcp) { - address = *tcp; - return true; - } + if (addressIndex < urls[urlIndex].size()) { + address = urls[urlIndex][addressIndex++]; + return true; } urlIndex++; addressIndex = 0; } - urlIndex = addressIndex = 0;//reset indices return false; } diff --git a/cpp/src/qpid/broker/RetryList.h b/cpp/src/qpid/broker/RetryList.h index f87adf2c8d..242a7d2122 100644 --- a/cpp/src/qpid/broker/RetryList.h +++ b/cpp/src/qpid/broker/RetryList.h @@ -38,7 +38,7 @@ class RetryList public: QPID_BROKER_EXTERN RetryList(); QPID_BROKER_EXTERN void reset(const std::vector<Url>& urls); - QPID_BROKER_EXTERN bool next(TcpAddress& address); + QPID_BROKER_EXTERN bool next(Address& address); private: std::vector<Url> urls; size_t urlIndex; diff --git a/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp b/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp index 0db8fb5713..62122cbaa9 100644 --- a/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp +++ b/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp @@ -52,7 +52,7 @@ struct SslServerOptions : qpid::Options SslServerOptions() : qpid::Options("SSL Options"),
certStore("My"), port(5671), clientAuth(false)
{
- qpid::TcpAddress me;
+ qpid::Address me;
if (qpid::sys::SystemInfo::getLocalHostname(me))
certName = me.host;
else
|
