diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/SessionImpl.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 66 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 19 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 16 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 19 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.cpp | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/PollableCondition.cpp (renamed from cpp/src/qpid/cluster/PollableCondition.cpp) | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/PollableCondition.h (renamed from cpp/src/qpid/cluster/PollableCondition.h) | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/PollableQueue.h (renamed from cpp/src/qpid/cluster/PollableQueue.h) | 12 |
12 files changed, 94 insertions, 78 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 913188845f..027f8a212d 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -70,8 +70,6 @@ SessionState::SessionState( } SessionState::~SessionState() { - // Remove ID from active session list. - broker.getSessionManager().forget(getId()); if (mgmtObject != 0) mgmtObject->resourceDestroy (); } diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp index b736d116e1..7b1cacb640 100644 --- a/cpp/src/qpid/client/SessionImpl.cpp +++ b/cpp/src/qpid/client/SessionImpl.cpp @@ -649,7 +649,7 @@ void SessionImpl::checkOpen() const //call with lock held. { check(); if (state != ATTACHED) { - throw NotAttachedException("Session isn't attached"); + throw NotAttachedException(QPID_MSG("Session " << getId() << " isn't attached")); } } diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index ce156e85e4..07ed4596e0 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -61,7 +61,7 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { }; Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : - broker(&b), + broker(b), poller(b.getPoller()), cpg(*this), name(name_), @@ -74,15 +74,17 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : ), deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, _1))) { - broker->addFinalizer(boost::bind(&Cluster::leave, this)); - QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self); + QPID_LOG(notice, "Cluster member " << self << " joining cluster " << name.str()); + broker.addFinalizer(boost::bind(&Cluster::shutdown, this)); cpg.join(name); deliverQueue.start(poller); cpgDispatchHandle.startWatch(poller); } -Cluster::~Cluster() {} +Cluster::~Cluster() { + QPID_LOG(debug, "~Cluster()"); +} void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { Mutex::ScopedLock l(lock); @@ -94,20 +96,13 @@ void Cluster::erase(ConnectionId id) { connections.erase(id); } +// FIXME aconway 2008-09-10: leave is currently not called, +// It should be called if we are shut down by a cluster admin command. +// Any other type of exit is caught in disconnect(). +// void Cluster::leave() { - Mutex::ScopedLock l(lock); - if (!broker) return; // Already left. - // Leave is called by from Broker destructor after the poller has - // been shut down. No dispatches can occur. - - QPID_LOG(notice, "Leaving cluster " << name.str()); + QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << name.str()); cpg.leave(name); - // broker= is set to 0 when the final config-change is delivered. - while(broker) { - Mutex::ScopedUnlock u(lock); - cpg.dispatchAll(); - } - cpg.shutdown(); } template <class T> void decodePtr(Buffer& buf, T*& ptr) { @@ -177,6 +172,7 @@ void Cluster::deliver( { try { MemberId from(nodeid, pid); + QPID_LOG(debug, "Cluster::deliver from " << from << " to " << self); // FIXME aconway 2008-09-10: deliverQueue.push(Event::delivered(from, msg, msg_len)); } catch (const std::exception& e) { @@ -238,7 +234,7 @@ void Cluster::configChange( cpg_address *left, int nLeft, cpg_address *joined, int nJoined) { - QPID_LOG(notice, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: " + QPID_LOG(info, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: " << AddrList(joined, nJoined) << AddrList(left, nLeft)); if (nJoined) // Notfiy new members of my URL. @@ -246,13 +242,14 @@ void Cluster::configChange( AMQFrame(in_place<ClusterJoiningBody>(ProtocolVersion(), url.str())), ConnectionId(self,0)); - + if (find(left, left+nLeft, self) != left+nLeft) { + // We have left the group, this is the final config change. + QPID_LOG(notice, "Cluster member " << self << " left cluster " << name.str()); + broker.shutdown(); + } Mutex::ScopedLock l(lock); for (int i = 0; i < nLeft; ++i) urls.erase(left[i]); // Add new members when their URL notice arraives. - - if (find(left, left+nLeft, self) != left+nLeft) - broker = 0; // We have left the group, this is the final config change. lock.notifyAll(); // Threads waiting for membership changes. } @@ -261,22 +258,35 @@ void Cluster::dispatch(sys::DispatchHandle& h) { h.rewatch(); } -void Cluster::disconnect(sys::DispatchHandle& h) { - h.stopWatch(); - QPID_LOG(critical, "Disconnected from cluster, shutting down"); - broker->shutdown(); +void Cluster::disconnect(sys::DispatchHandle& ) { + // FIXME aconway 2008-09-11: this should be logged as critical, + // when we provide admin option to shut down cluster and let + // members leave cleanly. + QPID_LOG(notice, "Cluster member " << self << " disconnected from cluster " << name.str()); + broker.shutdown(); } void Cluster::joining(const MemberId& m, const string& url) { - QPID_LOG(notice, "Cluster member " << m << " has URL " << url); + QPID_LOG(info, "Cluster member " << m << " has URL " << url); urls.insert(UrlMap::value_type(m,Url(url))); } void Cluster::ready(const MemberId& ) { // FIXME aconway 2008-09-08: TODO } - -}} // namespace qpid::cluster +// Called from Broker::~Broker when broker is shut down. At this +// point we know the poller has stopped so no poller callbacks will be +// invoked. We must ensure that CPG has also shut down so no CPG +// callbacks will be invoked. +// +void Cluster::shutdown() { + QPID_LOG(notice, "Cluster member " << self << " shutting down."); + try { cpg.shutdown(); } + catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); } + delete this; +} +broker::Broker& Cluster::getBroker(){ return broker; } +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index a25b62ea12..3a254684ad 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -21,7 +21,7 @@ #include "qpid/cluster/Cpg.h" #include "qpid/cluster/Event.h" -#include "qpid/cluster/PollableQueue.h" +#include "qpid/sys/PollableQueue.h" #include "qpid/cluster/NoOpConnectionOutputHandler.h" #include "qpid/broker/Broker.h" @@ -43,7 +43,7 @@ class Connection; * Connection to the cluster. * Keeps cluster membership data. */ -class Cluster : public RefCounted, private Cpg::Handler +class Cluster : private Cpg::Handler { public: @@ -78,17 +78,16 @@ class Cluster : public RefCounted, private Cpg::Handler void joining(const MemberId&, const std::string& url); void ready(const MemberId&); - broker::Broker& getBroker() { assert(broker); return *broker; } - MemberId getSelf() const { return self; } + void shutdown(); + + broker::Broker& getBroker(); + private: typedef std::map<MemberId, Url> UrlMap; typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap; - - /** Message sent over the cluster. */ - typedef std::pair<framing::AMQFrame, ConnectionId> Message; - typedef PollableQueue<Event> EventQueue; + typedef sys::PollableQueue<Event> EventQueue; boost::function<void()> shutdownNext; @@ -127,7 +126,7 @@ class Cluster : public RefCounted, private Cpg::Handler boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&); mutable sys::Monitor lock; // Protect access to members. - broker::Broker* broker; + broker::Broker& broker; boost::shared_ptr<sys::Poller> poller; Cpg cpg; Cpg::Name name; @@ -137,7 +136,7 @@ class Cluster : public RefCounted, private Cpg::Handler ConnectionMap connections; NoOpConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; - PollableQueue<Event> deliverQueue; + EventQueue deliverQueue; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 31447f2fd0..f4128634a6 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -66,10 +66,10 @@ struct ClusterPlugin : public Plugin { ClusterValues values; ClusterOptions options; - boost::intrusive_ptr<Cluster> cluster; + Cluster* cluster; boost::scoped_ptr<ConnectionCodec::Factory> factory; - ClusterPlugin() : options(values) {} + ClusterPlugin() : options(values), cluster(0) {} Options* getOptions() { return &options; } @@ -78,20 +78,17 @@ struct ClusterPlugin : public Plugin { if (!broker || values.name.empty()) return; // Only if --cluster-name option was specified. QPID_LOG_IF(warning, cluster, "Ignoring multiple initialization of cluster plugin."); cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker); - broker->addFinalizer(boost::bind(&ClusterPlugin::shutdown, this)); broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); } void earlyInitialize(Plugin::Target&) {} - - void shutdown() { cluster = 0; } }; static ClusterPlugin instance; // Static initialization. // For test purposes. -boost::intrusive_ptr<Cluster> getGlobalCluster() { return instance.cluster; } +Cluster& getGlobalCluster() { assert(instance.cluster); return *instance.cluster; } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 506e982ffd..68d1b16dfa 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -106,5 +106,21 @@ void Connection::deliverBuffer(Buffer& buf) { deliver(decoder.frame); // FIXME aconway 2008-09-01: Queue frames for delivery in separate thread. } + +void Connection::sessionState(const SequenceNumber& /*replayStart*/, + const SequenceSet& /*sentIncomplete*/, + const SequenceNumber& /*expected*/, + const SequenceNumber& /*received*/, + const SequenceSet& /*unknownCompleted*/, + const SequenceSet& /*receivedIncomplete*/) +{ + // FIXME aconway 2008-09-10: TODO +} + +void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/) +{ + // FIXME aconway 2008-09-10: TODO +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index b3e151ce51..a30350585f 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -40,9 +40,7 @@ namespace framing { class AMQFrame; } namespace cluster { -/** - * Plug-in associated with broker::Connections, both local and shadow. - */ +/** Intercept broker::Connection calls for shadow and local cluster connections. */ class Connection : public RefCounted, public sys::ConnectionInputHandler, @@ -90,16 +88,13 @@ class Connection : sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, const std::string& id, bool isClient); // State dump methods. - virtual void sessionState(const framing::SequenceNumber& /*replayId*/, - const framing::SequenceNumber& /*sendId*/, - const framing::SequenceSet& /*sentIncomplete*/, - const framing::SequenceNumber& /*expectedId*/, - const framing::SequenceNumber& /*receivedId*/, - const framing::SequenceSet& /*unknownCompleted*/, - const framing::SequenceSet& /*receivedIncomplete*/) {} + virtual void sessionState(const SequenceNumber& replayStart, + const SequenceSet& sentIncomplete, + const SequenceNumber& expected, + const SequenceNumber& received, + const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); - virtual void shadowReady(uint64_t /*clusterId*/, - const std::string& /*userId*/) {} + virtual void shadowReady(uint64_t memberId, uint64_t connectionId); private: void sendDoOutput(); diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp index f093a0cc1c..6179eab724 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -30,16 +30,16 @@ namespace cluster { sys::ConnectionCodec* ConnectionCodec::Factory::create(framing::ProtocolVersion v, sys::OutputControl& out, const std::string& id) { - if (v == framing::ProtocolVersion(0, 10)) + if (v == framing::ProtocolVersion(0, 10)) return new ConnectionCodec(out, id, cluster); return 0; } +// FIXME aconway 2008-08-27: outbound connections need to be made +// with proper qpid::client code for failover, get rid of this +// broker-side hack. sys::ConnectionCodec* ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) { - // FIXME aconway 2008-08-27: outbound connections need to be made - // with proper qpid::client code for failover, get rid of this - // broker-side hack. return next->create(out, id); } diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h index 59ce20d821..22d752d174 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.h +++ b/cpp/src/qpid/cluster/ConnectionCodec.h @@ -50,7 +50,8 @@ class ConnectionCodec : public sys::ConnectionCodec { struct Factory : public sys::ConnectionCodec::Factory { boost::shared_ptr<sys::ConnectionCodec::Factory> next; Cluster& cluster; - Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c) : next(f), cluster(c) {} + Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c) + : next(f), cluster(c) {} sys::ConnectionCodec* create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id); sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id); }; diff --git a/cpp/src/qpid/cluster/PollableCondition.cpp b/cpp/src/qpid/sys/PollableCondition.cpp index eecf95ff8d..5a3bd583cf 100644 --- a/cpp/src/qpid/cluster/PollableCondition.cpp +++ b/cpp/src/qpid/sys/PollableCondition.cpp @@ -27,14 +27,14 @@ // #include "qpid/sys/posix/PrivatePosix.h" -#include "qpid/cluster/PollableCondition.h" +#include "qpid/sys/PollableCondition.h" #include "qpid/Exception.h" #include <unistd.h> #include <fcntl.h> namespace qpid { -namespace cluster { +namespace sys { PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { int fds[2]; @@ -67,13 +67,13 @@ void PollableCondition::set() { #if 0 // FIXME aconway 2008-08-12: More efficient Linux implementation using -// eventfd system call. Do a configure.ac test to enable this when -// eventfd is available. +// eventfd system call. Move to separate file & do configure.ac test +// to enable this when ::eventfd() is available. #include <sys/eventfd.h> namespace qpid { -namespace cluster { +namespace sys { PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { impl->fd = ::eventfd(0, 0); @@ -95,6 +95,6 @@ void PollableCondition::set() { #endif -}} // namespace qpid::cluster +}} // namespace qpid::sys #endif /*!QPID_SYS_LINUX_POLLABLECONDITION_CPP*/ diff --git a/cpp/src/qpid/cluster/PollableCondition.h b/cpp/src/qpid/sys/PollableCondition.h index 6bfca6cabe..6f0e12a474 100644 --- a/cpp/src/qpid/cluster/PollableCondition.h +++ b/cpp/src/qpid/sys/PollableCondition.h @@ -29,7 +29,7 @@ // namespace qpid { -namespace cluster { +namespace sys { /** * A pollable condition to integrate in-process conditions with IO @@ -55,6 +55,6 @@ class PollableCondition : public sys::IOHandle { private: int writeFd; }; -}} // namespace qpid::cluster +}} // namespace qpid::sys #endif /*!QPID_SYS_POLLABLECONDITION_H*/ diff --git a/cpp/src/qpid/cluster/PollableQueue.h b/cpp/src/qpid/sys/PollableQueue.h index 1c7720f5c6..2e5d3a0d3d 100644 --- a/cpp/src/qpid/cluster/PollableQueue.h +++ b/cpp/src/qpid/sys/PollableQueue.h @@ -1,5 +1,5 @@ -#ifndef QPID_CLUSTER_POLLABLEQUEUE_H -#define QPID_CLUSTER_POLLABLEQUEUE_H +#ifndef QPID_SYS_POLLABLEQUEUE_H +#define QPID_SYS_POLLABLEQUEUE_H /* * @@ -22,7 +22,7 @@ * */ -#include "qpid/cluster/PollableCondition.h" +#include "qpid/sys/PollableCondition.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Mutex.h" #include <boost/function.hpp> @@ -34,7 +34,7 @@ namespace qpid { namespace sys { class Poller; } -namespace cluster { +namespace sys { // FIXME aconway 2008-08-11: this could be of more general interest, // move to common lib. @@ -108,6 +108,6 @@ template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { batch.clear(); } -}} // namespace qpid::cluster +}} // namespace qpid::sys -#endif /*!QPID_CLUSTER_POLLABLEQUEUE_H*/ +#endif /*!QPID_SYS_POLLABLEQUEUE_H*/ |
