summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-16 17:07:26 +0000
committerAlan Conway <aconway@apache.org>2008-10-16 17:07:26 +0000
commitd39a165c9c8d1fa2fd728a2237117efa71848874 (patch)
treedd07b81f1f2d2de42ce2fdf28432130566a5622e /cpp/src/qpid/client
parentf7a4f7bcf77726767d0905f56f5c44c7a34d82a3 (diff)
downloadqpid-python-d39a165c9c8d1fa2fd728a2237117efa71848874.tar.gz
Fix race in cluster causing incorrect known-broker lists to be sent to clients.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@705287 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp18
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h2
-rw-r--r--cpp/src/qpid/client/Dispatcher.cpp14
-rw-r--r--cpp/src/qpid/client/FailoverListener.cpp43
-rw-r--r--cpp/src/qpid/client/FailoverListener.h5
5 files changed, 53 insertions, 29 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index bb3517f839..652d59f448 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -32,17 +32,18 @@
#include <boost/bind.hpp>
#include <boost/format.hpp>
-using namespace qpid::client;
+
+namespace qpid {
+namespace client {
+
using namespace qpid::framing;
using namespace qpid::framing::connection;
using namespace qpid::sys;
-
using namespace qpid::framing::connection;//for connection error codes
ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
: Bounds(settings.maxFrameSize * settings.bounds),
handler(settings, v),
- failover(new FailoverListener()),
version(v),
nextChannel(1)
{
@@ -51,7 +52,6 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti
handler.out = boost::bind(&Connector::send, boost::ref(connector), _1);
handler.onClose = boost::bind(&ConnectionImpl::closed, this,
CLOSE_CODE_NORMAL, std::string());
-
//only set error handler once open
handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
}
@@ -69,7 +69,8 @@ void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, u
Mutex::ScopedLock l(lock);
session->setChannel(channel ? channel : nextChannel++);
boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()];
- if (s.lock()) throw SessionBusyException();
+ boost::shared_ptr<SessionImpl> ss = s.lock();
+ if (ss) throw SessionBusyException(QPID_MSG("Channel " << ss->getChannel() << " attachd to " << ss->getId()));
s = session;
}
@@ -110,7 +111,7 @@ void ConnectionImpl::open()
connector->init();
handler.waitForOpen();
- if (failover.get()) failover->start(shared_from_this());
+ failover.reset(new FailoverListener(shared_from_this(), handler.knownBrokersUrls));
}
void ConnectionImpl::idleIn()
@@ -176,7 +177,6 @@ const ConnectionSettings& ConnectionImpl::getNegotiatedSettings()
}
std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() {
- // FIXME aconway 2008-10-08: ensure we never return empty list, always include self Url.
return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls;
}
@@ -187,4 +187,6 @@ boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& na
return simpl;
}
-void ConnectionImpl::stopFailoverListener() { failover.reset(); }
+void ConnectionImpl::stopFailoverListener() { failover->stop(); }
+
+}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
index a432afff4f..c2cc6f006d 100644
--- a/cpp/src/qpid/client/ConnectionImpl.h
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -89,6 +89,8 @@ class ConnectionImpl : public Bounds,
std::vector<Url> getKnownBrokers();
void registerFailureCallback ( boost::function<void ()> fn ) { failureCallback = fn; }
void stopFailoverListener();
+
+ framing::ProtocolVersion getVersion() { return version; }
};
}}
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp
index fd9d8a8ad1..32d0001040 100644
--- a/cpp/src/qpid/client/Dispatcher.cpp
+++ b/cpp/src/qpid/client/Dispatcher.cpp
@@ -72,7 +72,7 @@ void Dispatcher::run()
boost::state_saver<bool> reset(running); // Reset to false on exit.
running = true;
try {
- while (!queue->isClosed()) {
+ while (true) {
Mutex::ScopedUnlock u(lock);
FrameSet::shared_ptr content = queue->pop();
if (content->isA<MessageTransferBody>()) {
@@ -92,12 +92,14 @@ void Dispatcher::run()
}
}
}
- session.sync(); // Make sure all our acks are received before returning.
}
- catch (const ClosedException& e)
- {
- QPID_LOG(debug, "Ignored exception in client dispatch thread: " << e.what());
- } //ignore it and return
+ catch (const ClosedException& e) {
+ QPID_LOG(debug, "Dispatch thread exiting, session closed: " << session.getId());
+ try {
+ session.sync(); // Make sure all our acks are received before returning.
+ }
+ catch(...) {}
+ }
catch (const std::exception& e) {
QPID_LOG(error, "Exception in client dispatch thread: " << e.what());
if ( failoverHandler )
diff --git a/cpp/src/qpid/client/FailoverListener.cpp b/cpp/src/qpid/client/FailoverListener.cpp
index e13f240439..98df12fc57 100644
--- a/cpp/src/qpid/client/FailoverListener.cpp
+++ b/cpp/src/qpid/client/FailoverListener.cpp
@@ -21,6 +21,7 @@
#include "FailoverListener.h"
#include "SessionBase_0_10Access.h"
#include "qpid/client/SubscriptionManager.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include "qpid/log/Helpers.h"
@@ -40,40 +41,58 @@ static Session makeSession(boost::shared_ptr<SessionImpl> si) {
return s;
}
-FailoverListener::FailoverListener() {}
+FailoverListener::FailoverListener(const boost::shared_ptr<ConnectionImpl>& c, const std::vector<Url>& initUrls)
+ : knownBrokers(initUrls)
+ {
+ // Special versions used to mark cluster catch-up connections
+ // which do not need a FailoverListener
+ if (c->getVersion().getMajor() >= 0x80) {
+ QPID_LOG(debug, "No failover listener for catch-up connection.");
+ return;
+ }
-void FailoverListener::start(const boost::shared_ptr<ConnectionImpl>& c) {
- Session session = makeSession(c->newSession(std::string(), 0));
+ Session session = makeSession(c->newSession(AMQ_FAILOVER+framing::Uuid(true).str(), 0));
if (session.exchangeQuery(arg::name=AMQ_FAILOVER).getNotFound()) {
session.close();
return;
}
subscriptions.reset(new SubscriptionManager(session));
- std::string qname=AMQ_FAILOVER + "." + session.getId().getName();
+ std::string qname=session.getId().getName();
session.queueDeclare(arg::queue=qname, arg::exclusive=true, arg::autoDelete=true);
session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER);
subscriptions->subscribe(*this, qname, FlowControl::unlimited());
thread = sys::Thread(*subscriptions);
}
-void FailoverListener::stop() {
- if (subscriptions.get()) subscriptions->stop();
- if (thread.id()) thread.join();
- if (subscriptions.get()) subscriptions->getSession().close();
- thread=sys::Thread();
- subscriptions.reset();
-}
FailoverListener::~FailoverListener() {
try { stop(); }
catch (const std::exception& e) {}
}
+void FailoverListener::stop() {
+ if (subscriptions.get())
+ subscriptions->stop();
+
+ if (thread.id() == sys::Thread::current().id()) {
+ // FIXME aconway 2008-10-16: this can happen if ConnectionImpl
+ // dtor runs when my session drops its weak pointer lock.
+ // For now, leak subscriptions to prevent a core if we delete
+ // without joining.
+ subscriptions.release();
+ }
+ else if (thread.id()) {
+ thread.join();
+ thread=sys::Thread();
+ subscriptions.reset(); // Safe to delete after join.
+ }
+}
+
void FailoverListener::received(Message& msg) {
sys::Mutex::ScopedLock l(lock);
knownBrokers.clear();
framing::Array urlArray;
msg.getHeaders().getArray("amq.failover", urlArray);
- for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i < urlArray.end(); ++i )
+ for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i != urlArray.end(); ++i )
knownBrokers.push_back(Url((*i)->get<std::string>()));
QPID_LOG(info, "Known-brokers update: " << log::formatList(knownBrokers));
}
diff --git a/cpp/src/qpid/client/FailoverListener.h b/cpp/src/qpid/client/FailoverListener.h
index c702fed846..fc0cca28f1 100644
--- a/cpp/src/qpid/client/FailoverListener.h
+++ b/cpp/src/qpid/client/FailoverListener.h
@@ -38,11 +38,10 @@ class SubscriptionManager;
*/
class FailoverListener : public MessageListener {
public:
- FailoverListener();
+ FailoverListener(const boost::shared_ptr<ConnectionImpl>&, const std::vector<Url>& initUrls);
~FailoverListener();
- void start(const boost::shared_ptr<ConnectionImpl>&);
void stop();
-
+
std::vector<Url> getKnownBrokers() const;
void received(Message& msg);