diff options
Diffstat (limited to 'cpp/src/qpid/cluster')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 66 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 19 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 16 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 19 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.cpp | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/PollableCondition.cpp | 100 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/PollableCondition.h | 60 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/PollableQueue.h | 113 |
10 files changed, 79 insertions, 334 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index ce156e85e4..07ed4596e0 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -61,7 +61,7 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { }; Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : - broker(&b), + broker(b), poller(b.getPoller()), cpg(*this), name(name_), @@ -74,15 +74,17 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : ), deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, _1))) { - broker->addFinalizer(boost::bind(&Cluster::leave, this)); - QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self); + QPID_LOG(notice, "Cluster member " << self << " joining cluster " << name.str()); + broker.addFinalizer(boost::bind(&Cluster::shutdown, this)); cpg.join(name); deliverQueue.start(poller); cpgDispatchHandle.startWatch(poller); } -Cluster::~Cluster() {} +Cluster::~Cluster() { + QPID_LOG(debug, "~Cluster()"); +} void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { Mutex::ScopedLock l(lock); @@ -94,20 +96,13 @@ void Cluster::erase(ConnectionId id) { connections.erase(id); } +// FIXME aconway 2008-09-10: leave is currently not called, +// It should be called if we are shut down by a cluster admin command. +// Any other type of exit is caught in disconnect(). +// void Cluster::leave() { - Mutex::ScopedLock l(lock); - if (!broker) return; // Already left. - // Leave is called by from Broker destructor after the poller has - // been shut down. No dispatches can occur. - - QPID_LOG(notice, "Leaving cluster " << name.str()); + QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << name.str()); cpg.leave(name); - // broker= is set to 0 when the final config-change is delivered. - while(broker) { - Mutex::ScopedUnlock u(lock); - cpg.dispatchAll(); - } - cpg.shutdown(); } template <class T> void decodePtr(Buffer& buf, T*& ptr) { @@ -177,6 +172,7 @@ void Cluster::deliver( { try { MemberId from(nodeid, pid); + QPID_LOG(debug, "Cluster::deliver from " << from << " to " << self); // FIXME aconway 2008-09-10: deliverQueue.push(Event::delivered(from, msg, msg_len)); } catch (const std::exception& e) { @@ -238,7 +234,7 @@ void Cluster::configChange( cpg_address *left, int nLeft, cpg_address *joined, int nJoined) { - QPID_LOG(notice, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: " + QPID_LOG(info, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: " << AddrList(joined, nJoined) << AddrList(left, nLeft)); if (nJoined) // Notfiy new members of my URL. @@ -246,13 +242,14 @@ void Cluster::configChange( AMQFrame(in_place<ClusterJoiningBody>(ProtocolVersion(), url.str())), ConnectionId(self,0)); - + if (find(left, left+nLeft, self) != left+nLeft) { + // We have left the group, this is the final config change. + QPID_LOG(notice, "Cluster member " << self << " left cluster " << name.str()); + broker.shutdown(); + } Mutex::ScopedLock l(lock); for (int i = 0; i < nLeft; ++i) urls.erase(left[i]); // Add new members when their URL notice arraives. - - if (find(left, left+nLeft, self) != left+nLeft) - broker = 0; // We have left the group, this is the final config change. lock.notifyAll(); // Threads waiting for membership changes. } @@ -261,22 +258,35 @@ void Cluster::dispatch(sys::DispatchHandle& h) { h.rewatch(); } -void Cluster::disconnect(sys::DispatchHandle& h) { - h.stopWatch(); - QPID_LOG(critical, "Disconnected from cluster, shutting down"); - broker->shutdown(); +void Cluster::disconnect(sys::DispatchHandle& ) { + // FIXME aconway 2008-09-11: this should be logged as critical, + // when we provide admin option to shut down cluster and let + // members leave cleanly. + QPID_LOG(notice, "Cluster member " << self << " disconnected from cluster " << name.str()); + broker.shutdown(); } void Cluster::joining(const MemberId& m, const string& url) { - QPID_LOG(notice, "Cluster member " << m << " has URL " << url); + QPID_LOG(info, "Cluster member " << m << " has URL " << url); urls.insert(UrlMap::value_type(m,Url(url))); } void Cluster::ready(const MemberId& ) { // FIXME aconway 2008-09-08: TODO } - -}} // namespace qpid::cluster +// Called from Broker::~Broker when broker is shut down. At this +// point we know the poller has stopped so no poller callbacks will be +// invoked. We must ensure that CPG has also shut down so no CPG +// callbacks will be invoked. +// +void Cluster::shutdown() { + QPID_LOG(notice, "Cluster member " << self << " shutting down."); + try { cpg.shutdown(); } + catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); } + delete this; +} +broker::Broker& Cluster::getBroker(){ return broker; } +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index a25b62ea12..3a254684ad 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -21,7 +21,7 @@ #include "qpid/cluster/Cpg.h" #include "qpid/cluster/Event.h" -#include "qpid/cluster/PollableQueue.h" +#include "qpid/sys/PollableQueue.h" #include "qpid/cluster/NoOpConnectionOutputHandler.h" #include "qpid/broker/Broker.h" @@ -43,7 +43,7 @@ class Connection; * Connection to the cluster. * Keeps cluster membership data. */ -class Cluster : public RefCounted, private Cpg::Handler +class Cluster : private Cpg::Handler { public: @@ -78,17 +78,16 @@ class Cluster : public RefCounted, private Cpg::Handler void joining(const MemberId&, const std::string& url); void ready(const MemberId&); - broker::Broker& getBroker() { assert(broker); return *broker; } - MemberId getSelf() const { return self; } + void shutdown(); + + broker::Broker& getBroker(); + private: typedef std::map<MemberId, Url> UrlMap; typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap; - - /** Message sent over the cluster. */ - typedef std::pair<framing::AMQFrame, ConnectionId> Message; - typedef PollableQueue<Event> EventQueue; + typedef sys::PollableQueue<Event> EventQueue; boost::function<void()> shutdownNext; @@ -127,7 +126,7 @@ class Cluster : public RefCounted, private Cpg::Handler boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&); mutable sys::Monitor lock; // Protect access to members. - broker::Broker* broker; + broker::Broker& broker; boost::shared_ptr<sys::Poller> poller; Cpg cpg; Cpg::Name name; @@ -137,7 +136,7 @@ class Cluster : public RefCounted, private Cpg::Handler ConnectionMap connections; NoOpConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; - PollableQueue<Event> deliverQueue; + EventQueue deliverQueue; friend std::ostream& operator <<(std::ostream&, const Cluster&); friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 31447f2fd0..f4128634a6 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -66,10 +66,10 @@ struct ClusterPlugin : public Plugin { ClusterValues values; ClusterOptions options; - boost::intrusive_ptr<Cluster> cluster; + Cluster* cluster; boost::scoped_ptr<ConnectionCodec::Factory> factory; - ClusterPlugin() : options(values) {} + ClusterPlugin() : options(values), cluster(0) {} Options* getOptions() { return &options; } @@ -78,20 +78,17 @@ struct ClusterPlugin : public Plugin { if (!broker || values.name.empty()) return; // Only if --cluster-name option was specified. QPID_LOG_IF(warning, cluster, "Ignoring multiple initialization of cluster plugin."); cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker); - broker->addFinalizer(boost::bind(&ClusterPlugin::shutdown, this)); broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster))); } void earlyInitialize(Plugin::Target&) {} - - void shutdown() { cluster = 0; } }; static ClusterPlugin instance; // Static initialization. // For test purposes. -boost::intrusive_ptr<Cluster> getGlobalCluster() { return instance.cluster; } +Cluster& getGlobalCluster() { assert(instance.cluster); return *instance.cluster; } }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 506e982ffd..68d1b16dfa 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -106,5 +106,21 @@ void Connection::deliverBuffer(Buffer& buf) { deliver(decoder.frame); // FIXME aconway 2008-09-01: Queue frames for delivery in separate thread. } + +void Connection::sessionState(const SequenceNumber& /*replayStart*/, + const SequenceSet& /*sentIncomplete*/, + const SequenceNumber& /*expected*/, + const SequenceNumber& /*received*/, + const SequenceSet& /*unknownCompleted*/, + const SequenceSet& /*receivedIncomplete*/) +{ + // FIXME aconway 2008-09-10: TODO +} + +void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/) +{ + // FIXME aconway 2008-09-10: TODO +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index b3e151ce51..a30350585f 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -40,9 +40,7 @@ namespace framing { class AMQFrame; } namespace cluster { -/** - * Plug-in associated with broker::Connections, both local and shadow. - */ +/** Intercept broker::Connection calls for shadow and local cluster connections. */ class Connection : public RefCounted, public sys::ConnectionInputHandler, @@ -90,16 +88,13 @@ class Connection : sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, const std::string& id, bool isClient); // State dump methods. - virtual void sessionState(const framing::SequenceNumber& /*replayId*/, - const framing::SequenceNumber& /*sendId*/, - const framing::SequenceSet& /*sentIncomplete*/, - const framing::SequenceNumber& /*expectedId*/, - const framing::SequenceNumber& /*receivedId*/, - const framing::SequenceSet& /*unknownCompleted*/, - const framing::SequenceSet& /*receivedIncomplete*/) {} + virtual void sessionState(const SequenceNumber& replayStart, + const SequenceSet& sentIncomplete, + const SequenceNumber& expected, + const SequenceNumber& received, + const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); - virtual void shadowReady(uint64_t /*clusterId*/, - const std::string& /*userId*/) {} + virtual void shadowReady(uint64_t memberId, uint64_t connectionId); private: void sendDoOutput(); diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp index f093a0cc1c..6179eab724 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -30,16 +30,16 @@ namespace cluster { sys::ConnectionCodec* ConnectionCodec::Factory::create(framing::ProtocolVersion v, sys::OutputControl& out, const std::string& id) { - if (v == framing::ProtocolVersion(0, 10)) + if (v == framing::ProtocolVersion(0, 10)) return new ConnectionCodec(out, id, cluster); return 0; } +// FIXME aconway 2008-08-27: outbound connections need to be made +// with proper qpid::client code for failover, get rid of this +// broker-side hack. sys::ConnectionCodec* ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) { - // FIXME aconway 2008-08-27: outbound connections need to be made - // with proper qpid::client code for failover, get rid of this - // broker-side hack. return next->create(out, id); } diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h index 59ce20d821..22d752d174 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.h +++ b/cpp/src/qpid/cluster/ConnectionCodec.h @@ -50,7 +50,8 @@ class ConnectionCodec : public sys::ConnectionCodec { struct Factory : public sys::ConnectionCodec::Factory { boost::shared_ptr<sys::ConnectionCodec::Factory> next; Cluster& cluster; - Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c) : next(f), cluster(c) {} + Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c) + : next(f), cluster(c) {} sys::ConnectionCodec* create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id); sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id); }; diff --git a/cpp/src/qpid/cluster/PollableCondition.cpp b/cpp/src/qpid/cluster/PollableCondition.cpp deleted file mode 100644 index eecf95ff8d..0000000000 --- a/cpp/src/qpid/cluster/PollableCondition.cpp +++ /dev/null @@ -1,100 +0,0 @@ -#ifndef QPID_SYS_LINUX_POLLABLECONDITION_CPP -#define QPID_SYS_LINUX_POLLABLECONDITION_CPP - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -// FIXME aconway 2008-08-11: this could be of more general interest, -// move to common lib. -// - -#include "qpid/sys/posix/PrivatePosix.h" -#include "qpid/cluster/PollableCondition.h" -#include "qpid/Exception.h" - -#include <unistd.h> -#include <fcntl.h> - -namespace qpid { -namespace cluster { - -PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { - int fds[2]; - if (::pipe(fds) == -1) - throw ErrnoException(QPID_MSG("Can't create PollableCondition")); - impl->fd = fds[0]; - writeFd = fds[1]; - if (::fcntl(impl->fd, F_SETFL, O_NONBLOCK) == -1) - throw ErrnoException(QPID_MSG("Can't create PollableCondition")); - if (::fcntl(writeFd, F_SETFL, O_NONBLOCK) == -1) - throw ErrnoException(QPID_MSG("Can't create PollableCondition")); -} - -bool PollableCondition::clear() { - char buf[256]; - ssize_t n; - bool wasSet = false; - while ((n = ::read(impl->fd, buf, sizeof(buf))) > 0) - wasSet = true; - if (n == -1 && errno != EAGAIN) throw ErrnoException(QPID_MSG("Error clearing PollableCondition")); - return wasSet; -} - -void PollableCondition::set() { - static const char dummy=0; - ssize_t n = ::write(writeFd, &dummy, 1); - if (n == -1 && errno != EAGAIN) throw ErrnoException("Error setting PollableCondition"); -} - - -#if 0 -// FIXME aconway 2008-08-12: More efficient Linux implementation using -// eventfd system call. Do a configure.ac test to enable this when -// eventfd is available. - -#include <sys/eventfd.h> - -namespace qpid { -namespace cluster { - -PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) { - impl->fd = ::eventfd(0, 0); - if (impl->fd < 0) throw ErrnoException("conditionfd() failed"); -} - -bool PollableCondition::clear() { - char buf[8]; - ssize_t n = ::read(impl->fd, buf, 8); - if (n != 8) throw ErrnoException("read failed on conditionfd"); - return *reinterpret_cast<uint64_t*>(buf); -} - -void PollableCondition::set() { - static const uint64_t value=1; - ssize_t n = ::write(impl->fd, reinterpret_cast<const void*>(&value), 8); - if (n != 8) throw ErrnoException("write failed on conditionfd"); -} - -#endif - -}} // namespace qpid::cluster - -#endif /*!QPID_SYS_LINUX_POLLABLECONDITION_CPP*/ diff --git a/cpp/src/qpid/cluster/PollableCondition.h b/cpp/src/qpid/cluster/PollableCondition.h deleted file mode 100644 index 6bfca6cabe..0000000000 --- a/cpp/src/qpid/cluster/PollableCondition.h +++ /dev/null @@ -1,60 +0,0 @@ -#ifndef QPID_SYS_POLLABLECONDITION_H -#define QPID_SYS_POLLABLECONDITION_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/IOHandle.h" - -// FIXME aconway 2008-08-11: this could be of more general interest, -// move to sys namespace in common lib. -// - -namespace qpid { -namespace cluster { - -/** - * A pollable condition to integrate in-process conditions with IO - * conditions in a polling loop. - * - * Setting the condition makes it readable for a poller. - * - * Writable/disconnected conditions are undefined and should not be - * polled for. - */ -class PollableCondition : public sys::IOHandle { - public: - PollableCondition(); - - /** Set the condition, triggers readable in a poller. */ - void set(); - - /** Get the current state of the condition, then clear it. - *@return The state of the condition before it was cleared. - */ - bool clear(); - - private: - int writeFd; -}; -}} // namespace qpid::cluster - -#endif /*!QPID_SYS_POLLABLECONDITION_H*/ diff --git a/cpp/src/qpid/cluster/PollableQueue.h b/cpp/src/qpid/cluster/PollableQueue.h deleted file mode 100644 index 1c7720f5c6..0000000000 --- a/cpp/src/qpid/cluster/PollableQueue.h +++ /dev/null @@ -1,113 +0,0 @@ -#ifndef QPID_CLUSTER_POLLABLEQUEUE_H -#define QPID_CLUSTER_POLLABLEQUEUE_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/cluster/PollableCondition.h" -#include "qpid/sys/Dispatcher.h" -#include "qpid/sys/Mutex.h" -#include <boost/function.hpp> -#include <boost/bind.hpp> -#include <algorithm> -#include <deque> - -namespace qpid { - -namespace sys { class Poller; } - -namespace cluster { - -// FIXME aconway 2008-08-11: this could be of more general interest, -// move to common lib. - -/** - * A queue that can be polled by sys::Poller. Any thread can push to - * the queue, on wakeup the poller thread processes all items on the - * queue by passing them to a callback in a batch. - */ -template <class T> -class PollableQueue { - typedef std::deque<T> Queue; - - public: - typedef typename Queue::iterator iterator; - - /** Callback to process a range of items. */ - typedef boost::function<void (const iterator&, const iterator&)> Callback; - - /** Functor tempalate to create a Callback from a functor that handles a single item. */ - template <class F> struct ForEach { - F handleOne; - ForEach(const F& f) : handleOne(f) {} - void operator()(const iterator& i, const iterator& j) const { std::for_each(i, j, handleOne); } - }; - /** Function to create ForEach instances */ - template <class F> static ForEach<F> forEach(const F& f) { return ForEach<F>(f); } - - /** When the queue is selected by the poller, values are passed to callback cb. */ - explicit PollableQueue(const Callback& cb); - - /** Push a value onto the queue. Thread safe */ - void push(const T& t) { ScopedLock l(lock); queue.push_back(t); condition.set(); } - - /** Start polling. */ - void start(const boost::shared_ptr<sys::Poller>& poller) { handle.startWatch(poller); } - - /** Stop polling. */ - void stop() { handle.stopWatch(); } - - private: - typedef sys::Mutex::ScopedLock ScopedLock; - typedef sys::Mutex::ScopedUnlock ScopedUnlock; - - void dispatch(sys::DispatchHandle&); - - sys::Mutex lock; - Callback callback; - PollableCondition condition; - sys::DispatchHandle handle; - Queue queue; - Queue batch; -}; - -template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12: - : callback(cb), - handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0) -{} - -template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) { - ScopedLock l(lock); // Lock for concurrent push() - batch.clear(); - batch.swap(queue); - condition.clear(); - { - // Process outside the lock to allow concurrent push. - ScopedUnlock u(lock); - callback(batch.begin(), batch.end()); - h.rewatch(); - } - batch.clear(); -} - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_POLLABLEQUEUE_H*/ |
