diff options
| author | Alan Conway <aconway@apache.org> | 2008-10-16 17:07:26 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-10-16 17:07:26 +0000 |
| commit | d39a165c9c8d1fa2fd728a2237117efa71848874 (patch) | |
| tree | dd07b81f1f2d2de42ce2fdf28432130566a5622e /cpp/src/qpid/client | |
| parent | f7a4f7bcf77726767d0905f56f5c44c7a34d82a3 (diff) | |
| download | qpid-python-d39a165c9c8d1fa2fd728a2237117efa71848874.tar.gz | |
Fix race in cluster causing incorrect known-broker lists to be sent to clients.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@705287 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 18 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Dispatcher.cpp | 14 | ||||
| -rw-r--r-- | cpp/src/qpid/client/FailoverListener.cpp | 43 | ||||
| -rw-r--r-- | cpp/src/qpid/client/FailoverListener.h | 5 |
5 files changed, 53 insertions, 29 deletions
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index bb3517f839..652d59f448 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -32,17 +32,18 @@ #include <boost/bind.hpp> #include <boost/format.hpp> -using namespace qpid::client; + +namespace qpid { +namespace client { + using namespace qpid::framing; using namespace qpid::framing::connection; using namespace qpid::sys; - using namespace qpid::framing::connection;//for connection error codes ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), - failover(new FailoverListener()), version(v), nextChannel(1) { @@ -51,7 +52,6 @@ ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSetti handler.out = boost::bind(&Connector::send, boost::ref(connector), _1); handler.onClose = boost::bind(&ConnectionImpl::closed, this, CLOSE_CODE_NORMAL, std::string()); - //only set error handler once open handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2); } @@ -69,7 +69,8 @@ void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, u Mutex::ScopedLock l(lock); session->setChannel(channel ? channel : nextChannel++); boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()]; - if (s.lock()) throw SessionBusyException(); + boost::shared_ptr<SessionImpl> ss = s.lock(); + if (ss) throw SessionBusyException(QPID_MSG("Channel " << ss->getChannel() << " attachd to " << ss->getId())); s = session; } @@ -110,7 +111,7 @@ void ConnectionImpl::open() connector->init(); handler.waitForOpen(); - if (failover.get()) failover->start(shared_from_this()); + failover.reset(new FailoverListener(shared_from_this(), handler.knownBrokersUrls)); } void ConnectionImpl::idleIn() @@ -176,7 +177,6 @@ const ConnectionSettings& ConnectionImpl::getNegotiatedSettings() } std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() { - // FIXME aconway 2008-10-08: ensure we never return empty list, always include self Url. return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls; } @@ -187,4 +187,6 @@ boost::shared_ptr<SessionImpl> ConnectionImpl::newSession(const std::string& na return simpl; } -void ConnectionImpl::stopFailoverListener() { failover.reset(); } +void ConnectionImpl::stopFailoverListener() { failover->stop(); } + +}} // namespace qpid::client diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index a432afff4f..c2cc6f006d 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -89,6 +89,8 @@ class ConnectionImpl : public Bounds, std::vector<Url> getKnownBrokers(); void registerFailureCallback ( boost::function<void ()> fn ) { failureCallback = fn; } void stopFailoverListener(); + + framing::ProtocolVersion getVersion() { return version; } }; }} diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp index fd9d8a8ad1..32d0001040 100644 --- a/cpp/src/qpid/client/Dispatcher.cpp +++ b/cpp/src/qpid/client/Dispatcher.cpp @@ -72,7 +72,7 @@ void Dispatcher::run() boost::state_saver<bool> reset(running); // Reset to false on exit. running = true; try { - while (!queue->isClosed()) { + while (true) { Mutex::ScopedUnlock u(lock); FrameSet::shared_ptr content = queue->pop(); if (content->isA<MessageTransferBody>()) { @@ -92,12 +92,14 @@ void Dispatcher::run() } } } - session.sync(); // Make sure all our acks are received before returning. } - catch (const ClosedException& e) - { - QPID_LOG(debug, "Ignored exception in client dispatch thread: " << e.what()); - } //ignore it and return + catch (const ClosedException& e) { + QPID_LOG(debug, "Dispatch thread exiting, session closed: " << session.getId()); + try { + session.sync(); // Make sure all our acks are received before returning. + } + catch(...) {} + } catch (const std::exception& e) { QPID_LOG(error, "Exception in client dispatch thread: " << e.what()); if ( failoverHandler ) diff --git a/cpp/src/qpid/client/FailoverListener.cpp b/cpp/src/qpid/client/FailoverListener.cpp index e13f240439..98df12fc57 100644 --- a/cpp/src/qpid/client/FailoverListener.cpp +++ b/cpp/src/qpid/client/FailoverListener.cpp @@ -21,6 +21,7 @@ #include "FailoverListener.h" #include "SessionBase_0_10Access.h" #include "qpid/client/SubscriptionManager.h" +#include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" #include "qpid/log/Helpers.h" @@ -40,40 +41,58 @@ static Session makeSession(boost::shared_ptr<SessionImpl> si) { return s; } -FailoverListener::FailoverListener() {} +FailoverListener::FailoverListener(const boost::shared_ptr<ConnectionImpl>& c, const std::vector<Url>& initUrls) + : knownBrokers(initUrls) + { + // Special versions used to mark cluster catch-up connections + // which do not need a FailoverListener + if (c->getVersion().getMajor() >= 0x80) { + QPID_LOG(debug, "No failover listener for catch-up connection."); + return; + } -void FailoverListener::start(const boost::shared_ptr<ConnectionImpl>& c) { - Session session = makeSession(c->newSession(std::string(), 0)); + Session session = makeSession(c->newSession(AMQ_FAILOVER+framing::Uuid(true).str(), 0)); if (session.exchangeQuery(arg::name=AMQ_FAILOVER).getNotFound()) { session.close(); return; } subscriptions.reset(new SubscriptionManager(session)); - std::string qname=AMQ_FAILOVER + "." + session.getId().getName(); + std::string qname=session.getId().getName(); session.queueDeclare(arg::queue=qname, arg::exclusive=true, arg::autoDelete=true); session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER); subscriptions->subscribe(*this, qname, FlowControl::unlimited()); thread = sys::Thread(*subscriptions); } -void FailoverListener::stop() { - if (subscriptions.get()) subscriptions->stop(); - if (thread.id()) thread.join(); - if (subscriptions.get()) subscriptions->getSession().close(); - thread=sys::Thread(); - subscriptions.reset(); -} FailoverListener::~FailoverListener() { try { stop(); } catch (const std::exception& e) {} } +void FailoverListener::stop() { + if (subscriptions.get()) + subscriptions->stop(); + + if (thread.id() == sys::Thread::current().id()) { + // FIXME aconway 2008-10-16: this can happen if ConnectionImpl + // dtor runs when my session drops its weak pointer lock. + // For now, leak subscriptions to prevent a core if we delete + // without joining. + subscriptions.release(); + } + else if (thread.id()) { + thread.join(); + thread=sys::Thread(); + subscriptions.reset(); // Safe to delete after join. + } +} + void FailoverListener::received(Message& msg) { sys::Mutex::ScopedLock l(lock); knownBrokers.clear(); framing::Array urlArray; msg.getHeaders().getArray("amq.failover", urlArray); - for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i < urlArray.end(); ++i ) + for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i != urlArray.end(); ++i ) knownBrokers.push_back(Url((*i)->get<std::string>())); QPID_LOG(info, "Known-brokers update: " << log::formatList(knownBrokers)); } diff --git a/cpp/src/qpid/client/FailoverListener.h b/cpp/src/qpid/client/FailoverListener.h index c702fed846..fc0cca28f1 100644 --- a/cpp/src/qpid/client/FailoverListener.h +++ b/cpp/src/qpid/client/FailoverListener.h @@ -38,11 +38,10 @@ class SubscriptionManager; */ class FailoverListener : public MessageListener { public: - FailoverListener(); + FailoverListener(const boost::shared_ptr<ConnectionImpl>&, const std::vector<Url>& initUrls); ~FailoverListener(); - void start(const boost::shared_ptr<ConnectionImpl>&); void stop(); - + std::vector<Url> getKnownBrokers() const; void received(Message& msg); |
