summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp15
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.h13
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h10
-rwxr-xr-xqpid/cpp/src/tests/run_federation_tests15
6 files changed, 37 insertions, 23 deletions
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 9a1f4be468..5b531e4636 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -62,7 +62,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
InitializeCallback init) :
link(_link), id(_id), args(_args), mgmtObject(0),
listener(l), name(Uuid(true).str()), queueName("qpid.bridge_queue_"), persistenceId(0),
- initialize(init)
+ initialize(init), detached(false)
{
std::stringstream title;
title << id << "_" << name;
@@ -85,11 +85,14 @@ Bridge::~Bridge()
void Bridge::create(Connection& c)
{
+ detached = false; // Reset detached in case we are recovering.
connState = &c;
conn = &c;
FieldTable options;
if (args.i_sync) options.setInt("qpid.sync_frequency", args.i_sync);
SessionHandler& sessionHandler = c.getChannel(id);
+ sessionHandler.setDetachedCallback(
+ boost::bind(&Bridge::sessionDetached, shared_from_this()));
if (args.i_srcIsLocal) {
if (args.i_dynamic)
throw Exception("Dynamic routing not supported for push routes");
@@ -179,12 +182,6 @@ void Bridge::destroy()
listener(this);
}
-bool Bridge::isSessionReady() const
-{
- SessionHandler& sessionHandler = conn->getChannel(id);
- return sessionHandler.ready();
-}
-
void Bridge::setPersistenceId(uint64_t pId) const
{
persistenceId = pId;
@@ -336,4 +333,8 @@ const string& Bridge::getLocalTag() const
return link->getBroker()->getFederationTag();
}
+void Bridge::sessionDetached() {
+ detached = true;
+}
+
}}
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index b849b11ba8..32b9fd1781 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -33,6 +33,7 @@
#include "qmf/org/apache/qpid/broker/Bridge.h"
#include <boost/function.hpp>
+#include <boost/enable_shared_from_this.hpp>
#include <memory>
namespace qpid {
@@ -44,7 +45,10 @@ class Link;
class LinkRegistry;
class SessionHandler;
-class Bridge : public PersistableConfig, public management::Manageable, public Exchange::DynamicBridge
+class Bridge : public PersistableConfig,
+ public management::Manageable,
+ public Exchange::DynamicBridge,
+ public boost::enable_shared_from_this<Bridge>
{
public:
typedef boost::shared_ptr<Bridge> shared_ptr;
@@ -63,7 +67,7 @@ public:
void destroy();
bool isDurable() { return args.i_durable; }
- bool isSessionReady() const;
+ bool isDetached() const { return detached; }
management::ManagementObject* GetManagementObject() const;
management::Manageable::status_t ManagementMethod(uint32_t methodId,
@@ -90,6 +94,9 @@ public:
const qmf::org::apache::qpid::broker::ArgsLinkBridge& getArgs() { return args; }
private:
+ // Callback when the bridge's session is detached.
+ void sessionDetached();
+
struct PushHandler : framing::FrameHandler {
PushHandler(Connection* c) { conn = c; }
void handle(framing::AMQFrame& frame);
@@ -112,7 +119,7 @@ private:
ConnectionState* connState;
Connection* conn;
InitializeCallback initialize;
-
+ bool detached; // Set when session is detached.
bool resetProxy();
};
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index c90e748077..855063a6ad 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -312,7 +312,7 @@ void Link::ioThreadProcessing()
// check for bridge session errors and recover
if (!active.empty()) {
Bridges::iterator removed = std::remove_if(
- active.begin(), active.end(), !boost::bind(&Bridge::isSessionReady, _1));
+ active.begin(), active.end(), boost::bind(&Bridge::isDetached, _1));
for (Bridges::iterator i = removed; i != active.end(); ++i) {
Bridge::shared_ptr bridge = *i;
bridge->closed();
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index 752fa55535..b58c7c01c5 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -64,6 +64,7 @@ void SessionHandler::handleDetach() {
if (session.get())
connection.getBroker().getSessionManager().detach(session);
assert(!session.get());
+ if (detachedCallback) detachedCallback();
connection.closeChannel(channel.get());
}
@@ -117,4 +118,8 @@ void SessionHandler::attached(const std::string& name)
}
}
+void SessionHandler::setDetachedCallback(boost::function<void()> cb) {
+ detachedCallback = cb;
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index 8cd5072574..4e2cfaa963 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,6 +25,7 @@
#include "qpid/amqp_0_10/SessionHandler.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/AMQP_ClientProxy.h"
+#include <boost/function.hpp>
namespace qpid {
class SessionState;
@@ -61,7 +62,7 @@ class SessionHandler : public amqp_0_10::SessionHandler {
* This proxy is for sending such commands. In a clustered broker it will take steps
* to synchronize command order across the cluster. In a stand-alone broker
* it is just a synonym for getProxy()
- */
+ */
framing::AMQP_ClientProxy& getClusterOrderProxy() {
return clusterOrderProxy.get() ? *clusterOrderProxy : proxy;
}
@@ -70,6 +71,8 @@ class SessionHandler : public amqp_0_10::SessionHandler {
void attached(const std::string& name);//used by 'pushing' inter-broker bridges
void attachAs(const std::string& name);//used by 'pulling' inter-broker bridges
+ void setDetachedCallback(boost::function<void()> cb);
+
protected:
virtual void setState(const std::string& sessionName, bool force);
virtual qpid::SessionState* getState();
@@ -91,6 +94,7 @@ class SessionHandler : public amqp_0_10::SessionHandler {
framing::AMQP_ClientProxy proxy;
std::auto_ptr<SessionState> session;
std::auto_ptr<SetChannelProxy> clusterOrderProxy;
+ boost::function<void ()> detachedCallback;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/tests/run_federation_tests b/qpid/cpp/src/tests/run_federation_tests
index b71fa14c47..7735b559cf 100755
--- a/qpid/cpp/src/tests/run_federation_tests
+++ b/qpid/cpp/src/tests/run_federation_tests
@@ -33,16 +33,13 @@ else
SKIPTESTS='-i *_xml' # note: single quotes prevent expansion of *
fi
+QPIDD_CMD="../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no --log-enable=info+ --log-enable=debug+:Bridge --log-to-file"
start_brokers() {
- ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port
- LOCAL_PORT=`cat qpidd.port`
- ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port
- REMOTE_PORT=`cat qpidd.port`
-
- ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port
- REMOTE_B1=`cat qpidd.port`
- ../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no > qpidd.port
- REMOTE_B2=`cat qpidd.port`
+ rm -f fed_local.log fed_remote.log fed_b1.log fed_b2.log
+ LOCAL_PORT=$($QPIDD_CMD fed_local.log)
+ REMOTE_PORT=$($QPIDD_CMD fed_remote.log)
+ REMOTE_B1=$($QPIDD_CMD fed_b1.log)
+ REMOTE_B2=$($QPIDD_CMD fed_b2.log)
}
stop_brokers() {