diff options
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 15 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.h | 13 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionHandler.cpp | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionHandler.h | 10 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/run_federation_tests | 15 |
6 files changed, 37 insertions, 23 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 9a1f4be468..5b531e4636 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -62,7 +62,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l, InitializeCallback init) : link(_link), id(_id), args(_args), mgmtObject(0), listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0), - initialize(init) + initialize(init), detached(false) { std::stringstream title; title << id << "_" << name; @@ -85,11 +85,14 @@ Bridge::~Bridge() void Bridge::create(Connection& c) { + detached = false; // Reset detached in case we are recovering. connState = &c; conn = &c; FieldTable options; if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync); SessionHandler& sessionHandler = c.getChannel(id); + sessionHandler.setDetachedCallback( + boost::bind(&Bridge::sessionDetached, shared_from_this())); if (args.i_srcIsLocal) { if (args.i_dynamic) throw Exception("Dynamic routing not supported for push routes"); @@ -179,12 +182,6 @@ void Bridge::destroy() listener(this); } -bool Bridge::isSessionReady() const -{ - SessionHandler& sessionHandler = conn->getChannel(id); - return sessionHandler.ready(); -} - void Bridge::setPersistenceId(uint64_t pId) const { persistenceId = pId; @@ -336,4 +333,8 @@ const string& Bridge::getLocalTag() const return link->getBroker()->getFederationTag(); } +void Bridge::sessionDetached() { + detached = true; +} + }} diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index b849b11ba8..32b9fd1781 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -33,6 +33,7 @@ #include "qmf/org/apache/qpid/broker/Bridge.h" #include <boost/function.hpp> +#include <boost/enable_shared_from_this.hpp> #include <memory> namespace qpid { @@ -44,7 +45,10 @@ class Link; class LinkRegistry; class SessionHandler; -class Bridge : public PersistableConfig, public management::Manageable, public Exchange::DynamicBridge +class Bridge : public PersistableConfig, + public management::Manageable, + public Exchange::DynamicBridge, + public boost::enable_shared_from_this<Bridge> { public: typedef boost::shared_ptr<Bridge> shared_ptr; @@ -63,7 +67,7 @@ public: void destroy(); bool isDurable() { return args.i_durable; } - bool isSessionReady() const; + bool isDetached() const { return detached; } management::ManagementObject* GetManagementObject() const; management::Manageable::status_t ManagementMethod(uint32_t methodId, @@ -90,6 +94,9 @@ public: const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; } private: + // Callback when the bridge's session is detached. + void sessionDetached(); + struct PushHandler : framing::FrameHandler { PushHandler(Connection* c) { conn = c; } void handle(framing::AMQFrame& frame); @@ -112,7 +119,7 @@ private: ConnectionState* connState; Connection* conn; InitializeCallback initialize; - + bool detached; // Set when session is detached. bool resetProxy(); }; diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index c90e748077..855063a6ad 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -312,7 +312,7 @@ void Link::ioThreadProcessing() // check for bridge session errors and recover if (!active.empty()) { Bridges::iterator removed = std::remove_if( - active.begin(), active.end(), !boost::bind(&Bridge::isSessionReady, _1)); + active.begin(), active.end(), boost::bind(&Bridge::isDetached, _1)); for (Bridges::iterator i = removed; i != active.end(); ++i) { Bridge::shared_ptr bridge = *i; bridge->closed(); diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 752fa55535..b58c7c01c5 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -64,6 +64,7 @@ void SessionHandler::handleDetach() { if (session.get()) connection.getBroker().getSessionManager().detach(session); assert(!session.get()); + if (detachedCallback) detachedCallback(); connection.closeChannel(channel.get()); } @@ -117,4 +118,8 @@ void SessionHandler::attached(const std::string& name) } } +void SessionHandler::setDetachedCallback(boost::function<void()> cb) { + detachedCallback = cb; +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index 8cd5072574..4e2cfaa963 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,6 +25,7 @@ #include "qpid/amqp_0_10/SessionHandler.h" #include "qpid/broker/SessionHandler.h" #include "qpid/framing/AMQP_ClientProxy.h" +#include <boost/function.hpp> namespace qpid { class SessionState; @@ -61,7 +62,7 @@ class SessionHandler : public amqp_0_10::SessionHandler { * This proxy is for sending such commands. In a clustered broker it will take steps * to synchronize command order across the cluster. In a stand-alone broker * it is just a synonym for getProxy() - */ + */ framing::AMQP_ClientProxy& getClusterOrderProxy() { return clusterOrderProxy.get() ? *clusterOrderProxy : proxy; } @@ -70,6 +71,8 @@ class SessionHandler : public amqp_0_10::SessionHandler { void attached(const std::string& name);//used by 'pushing' inter-broker bridges void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges + void setDetachedCallback(boost::function<void()> cb); + protected: virtual void setState(const std::string& sessionName, bool force); virtual qpid::SessionState* getState(); @@ -91,6 +94,7 @@ class SessionHandler : public amqp_0_10::SessionHandler { framing::AMQP_ClientProxy proxy; std::auto_ptr<SessionState> session; std::auto_ptr<SetChannelProxy> clusterOrderProxy; + boost::function<void ()> detachedCallback; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/tests/run_federation_tests b/qpid/cpp/src/tests/run_federation_tests index b71fa14c47..7735b559cf 100755 --- a/qpid/cpp/src/tests/run_federation_tests +++ b/qpid/cpp/src/tests/run_federation_tests @@ -33,16 +33,13 @@ else SKIPTESTS='-i *_xml' # note: single quotes prevent expansion of * fi +QPIDD_CMD="../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no --log-enable=info+ --log-enable=debug+:Bridge --log-to-file" start_brokers() { - ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port - LOCAL_PORT=`cat qpidd.port` - ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port - REMOTE_PORT=`cat qpidd.port` - - ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port - REMOTE_B1=`cat qpidd.port` - ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port - REMOTE_B2=`cat qpidd.port` + rm -f fed_local.log fed_remote.log fed_b1.log fed_b2.log + LOCAL_PORT=$($QPIDD_CMD fed_local.log) + REMOTE_PORT=$($QPIDD_CMD fed_remote.log) + REMOTE_B1=$($QPIDD_CMD fed_b1.log) + REMOTE_B2=$($QPIDD_CMD fed_b2.log) } stop_brokers() { |
