From 7e6635d7e297f6ecf2199028da573bc10437f3ae Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Tue, 13 May 2008 21:38:28 +0000 Subject: QPID-990: Patch from Ted Ross to enable persisting of inter-broker routing entities git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@656023 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/LinkRegistry.cpp | 95 ++++++++++++++++++++++++++++++++++-- 1 file changed, 90 insertions(+), 5 deletions(-) (limited to 'cpp/src/qpid/broker/LinkRegistry.cpp') diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index 6e20a3f7ce..be3c67077e 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -46,15 +46,21 @@ void LinkRegistry::Periodic::fire () void LinkRegistry::periodicMaintenance () { Mutex::ScopedLock locker(lock); + linksToDestroy.clear(); + bridgesToDestroy.clear(); for (LinkMap::iterator i = links.begin(); i != links.end(); i++) i->second->maintenanceVisit(); } -pair LinkRegistry::declare(std::string& host, - uint16_t port, - bool useSsl, - bool durable) +pair LinkRegistry::declare(string& host, + uint16_t port, + bool useSsl, + bool durable, + string& authMechanism, + string& username, + string& password) + { Mutex::ScopedLock locker(lock); stringstream keystream; @@ -66,13 +72,64 @@ pair LinkRegistry::declare(std::string& host, { Link::shared_ptr link; - link = Link::shared_ptr (new Link (this, host, port, useSsl, durable, broker, parent)); + link = Link::shared_ptr (new Link (this, store, host, port, useSsl, durable, + authMechanism, username, password, + broker, parent)); links[key] = link; return std::pair(link, true); } return std::pair(i->second, false); } +pair LinkRegistry::declare(std::string& host, + uint16_t port, + bool durable, + std::string& src, + std::string& dest, + std::string& key, + bool is_queue, + bool is_local, + std::string& tag, + std::string& excludes) +{ + Mutex::ScopedLock locker(lock); + stringstream keystream; + keystream << host << ":" << port; + string linkKey = string(keystream.str()); + + keystream << "!" << src << "!" << dest << "!" << key; + string bridgeKey = string(keystream.str()); + + LinkMap::iterator l = links.find(linkKey); + if (l == links.end()) + return pair(Bridge::shared_ptr(), false); + + BridgeMap::iterator b = bridges.find(bridgeKey); + if (b == bridges.end()) + { + management::ArgsLinkBridge args; + Bridge::shared_ptr bridge; + + args.i_durable = durable; + args.i_src = src; + args.i_dest = dest; + args.i_key = key; + args.i_src_is_queue = is_queue; + args.i_src_is_local = is_local; + args.i_tag = tag; + args.i_excludes = excludes; + + bridge = Bridge::shared_ptr + (new Bridge (l->second.get(), l->second->nextChannel(), + boost::bind(&LinkRegistry::destroy, this, + host, port, src, dest, key), args)); + bridges[bridgeKey] = bridge; + l->second->add(bridge); + return std::pair(bridge, true); + } + return std::pair(b->second, false); +} + void LinkRegistry::destroy(const string& host, const uint16_t port) { Mutex::ScopedLock locker(lock); @@ -90,6 +147,34 @@ void LinkRegistry::destroy(const string& host, const uint16_t port) } } +void LinkRegistry::destroy(const std::string& host, + const uint16_t port, + const std::string& src, + const std::string& dest, + const std::string& key) +{ + Mutex::ScopedLock locker(lock); + stringstream keystream; + keystream << host << ":" << port; + string linkKey = string(keystream.str()); + + LinkMap::iterator l = links.find(linkKey); + if (l == links.end()) + return; + + keystream << "!" << src << "!" << dest << "!" << key; + string bridgeKey = string(keystream.str()); + BridgeMap::iterator b = bridges.find(bridgeKey); + if (b == bridges.end()) + return; + + l->second->cancel(b->second); + if (b->second->isDurable()) + store->destroy(*(b->second)); + bridgesToDestroy[bridgeKey] = b->second; + bridges.erase(b); +} + void LinkRegistry::setStore (MessageStore* _store) { assert (store == 0 && _store != 0); -- cgit v1.2.1