diff options
| author | Gordon Sim <gsim@apache.org> | 2009-01-22 22:53:50 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2009-01-22 22:53:50 +0000 |
| commit | d7ce27f7cc96894f149e5c20c03b306b80636727 (patch) | |
| tree | 22caa566993da19f9e211f69fdca64c13f1f04e6 /cpp/src/qpid/broker | |
| parent | 74481dd2b6b97374bd4f260ca89d9103ce6383ed (diff) | |
| download | qpid-python-d7ce27f7cc96894f149e5c20c03b306b80636727.tar.gz | |
QPID-1567: More changes to make clustering and federation work together
* replicate outgoing link traffic to all nodes
* coordinate amongst nodes so that only one node actually maintains active links
with the others able to take over if that node fails
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@736841 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Bridge.cpp | 7 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Connection.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Link.cpp | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Link.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.cpp | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.h | 12 |
6 files changed, 58 insertions, 3 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 6129f13ede..38a9b5d64c 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -72,6 +72,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, if (!args.i_durable) agent->addObject(mgmtObject); } + QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest); } Bridge::~Bridge() @@ -104,10 +105,11 @@ void Bridge::create(ConnectionState& c) session->attach(name, false); session->commandPoint(0,0); - if (args.i_srcIsQueue) { + if (args.i_srcIsQueue) { peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options); peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest); } else { FieldTable queueSettings; @@ -141,6 +143,9 @@ void Bridge::create(ConnectionState& c) if (exchange.get() == 0) throw Exception("Exchange not found for dynamic route"); exchange->registerDynamicBridge(this); + QPID_LOG(debug, "Activated dynamic route for exchange " << args.i_src); + } else { + QPID_LOG(debug, "Activated static route from exchange " << args.i_src << " to " << args.i_dest); } } } diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 5cbff57788..559cd4cfe3 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -117,7 +117,7 @@ class Connection : public sys::ConnectionInputHandler, ChannelMap channels; //framing::AMQP_ClientProxy::Connection* client; ConnectionHandler adapter; - bool isLink; + const bool isLink; bool mgmtClosing; const std::string mgmtId; boost::function0<void> ioCallback; diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 2bd15759ef..835b37e6eb 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -106,6 +106,7 @@ void Link::setStateLH (int newState) case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break; case STATE_FAILED : mgmtObject->set_state("Failed"); break; case STATE_CLOSED : mgmtObject->set_state("Closed"); break; + case STATE_PASSIVE : mgmtObject->set_state("Passive"); break; } } @@ -239,6 +240,7 @@ void Link::ioThreadProcessing() if (state != STATE_OPERATIONAL) return; + QPID_LOG(debug, "Link::ioThreadProcessing()"); //process any pending creates if (!created.empty()) { @@ -404,6 +406,7 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te case _qmf::Link::METHOD_BRIDGE : _qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args; + QPID_LOG(debug, "Link::bridge() request received"); // Durable bridges are only valid on durable links if (iargs.i_durable && !durable) { @@ -437,3 +440,17 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te return Manageable::STATUS_UNKNOWN_METHOD; } + +void Link::setPassive(bool passive) +{ + Mutex::ScopedLock mutex(lock); + if (passive) { + setStateLH(STATE_PASSIVE); + } else { + if (state == STATE_PASSIVE) { + setStateLH(STATE_WAITING); + } else { + QPID_LOG(warning, "Ignoring attempt to activate non-passive link"); + } + } +} diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h index 6fef694663..8e741c6eb7 100644 --- a/cpp/src/qpid/broker/Link.h +++ b/cpp/src/qpid/broker/Link.h @@ -76,6 +76,7 @@ namespace qpid { static const int STATE_OPERATIONAL = 3; static const int STATE_FAILED = 4; static const int STATE_CLOSED = 5; + static const int STATE_PASSIVE = 6; static const uint32_t MAX_INTERVAL = 32; @@ -120,6 +121,7 @@ namespace qpid { Broker* getBroker() { return broker; } void notifyConnectionForced(const std::string text); + void setPassive(bool p); // PersistableConfig: void setPersistenceId(uint64_t id) const; diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index f400f2066a..956a9ea5ae 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -31,7 +31,7 @@ namespace _qmf = qmf::org::apache::qpid::broker; #define LINK_MAINT_INTERVAL 2 -LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0) +LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0), passive(false), passiveChanged(false) { timer.add (intrusive_ptr<TimerTask> (new Periodic(*this))); } @@ -51,6 +51,14 @@ void LinkRegistry::periodicMaintenance () linksToDestroy.clear(); bridgesToDestroy.clear(); + if (passiveChanged) { + if (passive) { QPID_LOG(info, "Passivating links"); } + else { QPID_LOG(info, "Activating links"); } + for (LinkMap::iterator i = links.begin(); i != links.end(); i++) { + i->second->setPassive(passive); + } + passiveChanged = false; + } for (LinkMap::iterator i = links.begin(); i != links.end(); i++) i->second->maintenanceVisit(); //now process any requests for re-addressing @@ -109,6 +117,7 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host, link = Link::shared_ptr (new Link (this, store, host, port, transport, durable, authMechanism, username, password, broker, parent)); + if (passive) link->setPassive(true); links[key] = link; return std::pair<Link::shared_ptr, bool>(link, true); } @@ -129,6 +138,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host, uint16_t sync) { Mutex::ScopedLock locker(lock); + QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")"); + stringstream keystream; keystream << host << ":" << port; string linkKey = string(keystream.str()); @@ -291,3 +302,11 @@ std::string LinkRegistry::createKey(const TcpAddress& a) keystream << a.host << ":" << a.port; return string(keystream.str()); } + +void LinkRegistry::setPassive(bool p) +{ + Mutex::ScopedLock locker(lock); + passiveChanged = p != passive; + passive = p; + //will activate or passivate links on maintenance visit +} diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h index 6e228c0e2c..884228bd63 100644 --- a/cpp/src/qpid/broker/LinkRegistry.h +++ b/cpp/src/qpid/broker/LinkRegistry.h @@ -64,6 +64,8 @@ namespace broker { Timer timer; management::Manageable* parent; MessageStore* store; + bool passive; + bool passiveChanged; void periodicMaintenance (); bool updateAddress(const std::string& oldKey, const TcpAddress& newAddress); @@ -122,7 +124,17 @@ namespace broker { std::string getAuthCredentials (const std::string& key); std::string getAuthIdentity (const std::string& key); + /** + * Called by links failing over to new address + */ void changeAddress(const TcpAddress& oldAddress, const TcpAddress& newAddress); + /** + * Called to alter passive state. In passive state the links + * and bridges managed by a link registry will be recorded and + * updated but links won't actually establish connections and + * bridges won't therefore pull or push any messages. + */ + void setPassive(bool); }; } } |
