summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp66
-rw-r--r--cpp/src/qpid/cluster/Cluster.h19
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp9
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp16
-rw-r--r--cpp/src/qpid/cluster/Connection.h19
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.cpp8
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.h3
-rw-r--r--cpp/src/qpid/cluster/PollableCondition.cpp100
-rw-r--r--cpp/src/qpid/cluster/PollableCondition.h60
-rw-r--r--cpp/src/qpid/cluster/PollableQueue.h113
10 files changed, 79 insertions, 334 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index ce156e85e4..07ed4596e0 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -61,7 +61,7 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
};
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
- broker(&b),
+ broker(b),
poller(b.getPoller()),
cpg(*this),
name(name_),
@@ -74,15 +74,17 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
),
deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, _1)))
{
- broker->addFinalizer(boost::bind(&Cluster::leave, this));
- QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self);
+ QPID_LOG(notice, "Cluster member " << self << " joining cluster " << name.str());
+ broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
cpg.join(name);
deliverQueue.start(poller);
cpgDispatchHandle.startWatch(poller);
}
-Cluster::~Cluster() {}
+Cluster::~Cluster() {
+ QPID_LOG(debug, "~Cluster()");
+}
void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
Mutex::ScopedLock l(lock);
@@ -94,20 +96,13 @@ void Cluster::erase(ConnectionId id) {
connections.erase(id);
}
+// FIXME aconway 2008-09-10: leave is currently not called,
+// It should be called if we are shut down by a cluster admin command.
+// Any other type of exit is caught in disconnect().
+//
void Cluster::leave() {
- Mutex::ScopedLock l(lock);
- if (!broker) return; // Already left.
- // Leave is called by from Broker destructor after the poller has
- // been shut down. No dispatches can occur.
-
- QPID_LOG(notice, "Leaving cluster " << name.str());
+ QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << name.str());
cpg.leave(name);
- // broker= is set to 0 when the final config-change is delivered.
- while(broker) {
- Mutex::ScopedUnlock u(lock);
- cpg.dispatchAll();
- }
- cpg.shutdown();
}
template <class T> void decodePtr(Buffer& buf, T*& ptr) {
@@ -177,6 +172,7 @@ void Cluster::deliver(
{
try {
MemberId from(nodeid, pid);
+ QPID_LOG(debug, "Cluster::deliver from " << from << " to " << self); // FIXME aconway 2008-09-10:
deliverQueue.push(Event::delivered(from, msg, msg_len));
}
catch (const std::exception& e) {
@@ -238,7 +234,7 @@ void Cluster::configChange(
cpg_address *left, int nLeft,
cpg_address *joined, int nJoined)
{
- QPID_LOG(notice, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: "
+ QPID_LOG(info, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: "
<< AddrList(joined, nJoined) << AddrList(left, nLeft));
if (nJoined) // Notfiy new members of my URL.
@@ -246,13 +242,14 @@ void Cluster::configChange(
AMQFrame(in_place<ClusterJoiningBody>(ProtocolVersion(), url.str())),
ConnectionId(self,0));
-
+ if (find(left, left+nLeft, self) != left+nLeft) {
+ // We have left the group, this is the final config change.
+ QPID_LOG(notice, "Cluster member " << self << " left cluster " << name.str());
+ broker.shutdown();
+ }
Mutex::ScopedLock l(lock);
for (int i = 0; i < nLeft; ++i) urls.erase(left[i]);
// Add new members when their URL notice arraives.
-
- if (find(left, left+nLeft, self) != left+nLeft)
- broker = 0; // We have left the group, this is the final config change.
lock.notifyAll(); // Threads waiting for membership changes.
}
@@ -261,22 +258,35 @@ void Cluster::dispatch(sys::DispatchHandle& h) {
h.rewatch();
}
-void Cluster::disconnect(sys::DispatchHandle& h) {
- h.stopWatch();
- QPID_LOG(critical, "Disconnected from cluster, shutting down");
- broker->shutdown();
+void Cluster::disconnect(sys::DispatchHandle& ) {
+ // FIXME aconway 2008-09-11: this should be logged as critical,
+ // when we provide admin option to shut down cluster and let
+ // members leave cleanly.
+ QPID_LOG(notice, "Cluster member " << self << " disconnected from cluster " << name.str());
+ broker.shutdown();
}
void Cluster::joining(const MemberId& m, const string& url) {
- QPID_LOG(notice, "Cluster member " << m << " has URL " << url);
+ QPID_LOG(info, "Cluster member " << m << " has URL " << url);
urls.insert(UrlMap::value_type(m,Url(url)));
}
void Cluster::ready(const MemberId& ) {
// FIXME aconway 2008-09-08: TODO
}
-
-}} // namespace qpid::cluster
+// Called from Broker::~Broker when broker is shut down. At this
+// point we know the poller has stopped so no poller callbacks will be
+// invoked. We must ensure that CPG has also shut down so no CPG
+// callbacks will be invoked.
+//
+void Cluster::shutdown() {
+ QPID_LOG(notice, "Cluster member " << self << " shutting down.");
+ try { cpg.shutdown(); }
+ catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); }
+ delete this;
+}
+broker::Broker& Cluster::getBroker(){ return broker; }
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index a25b62ea12..3a254684ad 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -21,7 +21,7 @@
#include "qpid/cluster/Cpg.h"
#include "qpid/cluster/Event.h"
-#include "qpid/cluster/PollableQueue.h"
+#include "qpid/sys/PollableQueue.h"
#include "qpid/cluster/NoOpConnectionOutputHandler.h"
#include "qpid/broker/Broker.h"
@@ -43,7 +43,7 @@ class Connection;
* Connection to the cluster.
* Keeps cluster membership data.
*/
-class Cluster : public RefCounted, private Cpg::Handler
+class Cluster : private Cpg::Handler
{
public:
@@ -78,17 +78,16 @@ class Cluster : public RefCounted, private Cpg::Handler
void joining(const MemberId&, const std::string& url);
void ready(const MemberId&);
- broker::Broker& getBroker() { assert(broker); return *broker; }
-
MemberId getSelf() const { return self; }
+ void shutdown();
+
+ broker::Broker& getBroker();
+
private:
typedef std::map<MemberId, Url> UrlMap;
typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap;
-
- /** Message sent over the cluster. */
- typedef std::pair<framing::AMQFrame, ConnectionId> Message;
- typedef PollableQueue<Event> EventQueue;
+ typedef sys::PollableQueue<Event> EventQueue;
boost::function<void()> shutdownNext;
@@ -127,7 +126,7 @@ class Cluster : public RefCounted, private Cpg::Handler
boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&);
mutable sys::Monitor lock; // Protect access to members.
- broker::Broker* broker;
+ broker::Broker& broker;
boost::shared_ptr<sys::Poller> poller;
Cpg cpg;
Cpg::Name name;
@@ -137,7 +136,7 @@ class Cluster : public RefCounted, private Cpg::Handler
ConnectionMap connections;
NoOpConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
- PollableQueue<Event> deliverQueue;
+ EventQueue deliverQueue;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&);
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 31447f2fd0..f4128634a6 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -66,10 +66,10 @@ struct ClusterPlugin : public Plugin {
ClusterValues values;
ClusterOptions options;
- boost::intrusive_ptr<Cluster> cluster;
+ Cluster* cluster;
boost::scoped_ptr<ConnectionCodec::Factory> factory;
- ClusterPlugin() : options(values) {}
+ ClusterPlugin() : options(values), cluster(0) {}
Options* getOptions() { return &options; }
@@ -78,20 +78,17 @@ struct ClusterPlugin : public Plugin {
if (!broker || values.name.empty()) return; // Only if --cluster-name option was specified.
QPID_LOG_IF(warning, cluster, "Ignoring multiple initialization of cluster plugin.");
cluster = new Cluster(values.name, values.getUrl(broker->getPort()), *broker);
- broker->addFinalizer(boost::bind(&ClusterPlugin::shutdown, this));
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
}
void earlyInitialize(Plugin::Target&) {}
-
- void shutdown() { cluster = 0; }
};
static ClusterPlugin instance; // Static initialization.
// For test purposes.
-boost::intrusive_ptr<Cluster> getGlobalCluster() { return instance.cluster; }
+Cluster& getGlobalCluster() { assert(instance.cluster); return *instance.cluster; }
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 506e982ffd..68d1b16dfa 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -106,5 +106,21 @@ void Connection::deliverBuffer(Buffer& buf) {
deliver(decoder.frame); // FIXME aconway 2008-09-01: Queue frames for delivery in separate thread.
}
+
+void Connection::sessionState(const SequenceNumber& /*replayStart*/,
+ const SequenceSet& /*sentIncomplete*/,
+ const SequenceNumber& /*expected*/,
+ const SequenceNumber& /*received*/,
+ const SequenceSet& /*unknownCompleted*/,
+ const SequenceSet& /*receivedIncomplete*/)
+{
+ // FIXME aconway 2008-09-10: TODO
+}
+
+void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/)
+{
+ // FIXME aconway 2008-09-10: TODO
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index b3e151ce51..a30350585f 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -40,9 +40,7 @@ namespace framing { class AMQFrame; }
namespace cluster {
-/**
- * Plug-in associated with broker::Connections, both local and shadow.
- */
+/** Intercept broker::Connection calls for shadow and local cluster connections. */
class Connection :
public RefCounted,
public sys::ConnectionInputHandler,
@@ -90,16 +88,13 @@ class Connection :
sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, const std::string& id, bool isClient);
// State dump methods.
- virtual void sessionState(const framing::SequenceNumber& /*replayId*/,
- const framing::SequenceNumber& /*sendId*/,
- const framing::SequenceSet& /*sentIncomplete*/,
- const framing::SequenceNumber& /*expectedId*/,
- const framing::SequenceNumber& /*receivedId*/,
- const framing::SequenceSet& /*unknownCompleted*/,
- const framing::SequenceSet& /*receivedIncomplete*/) {}
+ virtual void sessionState(const SequenceNumber& replayStart,
+ const SequenceSet& sentIncomplete,
+ const SequenceNumber& expected,
+ const SequenceNumber& received,
+ const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
- virtual void shadowReady(uint64_t /*clusterId*/,
- const std::string& /*userId*/) {}
+ virtual void shadowReady(uint64_t memberId, uint64_t connectionId);
private:
void sendDoOutput();
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp
index f093a0cc1c..6179eab724 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.cpp
+++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp
@@ -30,16 +30,16 @@ namespace cluster {
sys::ConnectionCodec*
ConnectionCodec::Factory::create(framing::ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
- if (v == framing::ProtocolVersion(0, 10))
+ if (v == framing::ProtocolVersion(0, 10))
return new ConnectionCodec(out, id, cluster);
return 0;
}
+// FIXME aconway 2008-08-27: outbound connections need to be made
+// with proper qpid::client code for failover, get rid of this
+// broker-side hack.
sys::ConnectionCodec*
ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) {
- // FIXME aconway 2008-08-27: outbound connections need to be made
- // with proper qpid::client code for failover, get rid of this
- // broker-side hack.
return next->create(out, id);
}
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h
index 59ce20d821..22d752d174 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.h
+++ b/cpp/src/qpid/cluster/ConnectionCodec.h
@@ -50,7 +50,8 @@ class ConnectionCodec : public sys::ConnectionCodec {
struct Factory : public sys::ConnectionCodec::Factory {
boost::shared_ptr<sys::ConnectionCodec::Factory> next;
Cluster& cluster;
- Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c) : next(f), cluster(c) {}
+ Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c)
+ : next(f), cluster(c) {}
sys::ConnectionCodec* create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id);
sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id);
};
diff --git a/cpp/src/qpid/cluster/PollableCondition.cpp b/cpp/src/qpid/cluster/PollableCondition.cpp
deleted file mode 100644
index eecf95ff8d..0000000000
--- a/cpp/src/qpid/cluster/PollableCondition.cpp
+++ /dev/null
@@ -1,100 +0,0 @@
-#ifndef QPID_SYS_LINUX_POLLABLECONDITION_CPP
-#define QPID_SYS_LINUX_POLLABLECONDITION_CPP
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-// FIXME aconway 2008-08-11: this could be of more general interest,
-// move to common lib.
-//
-
-#include "qpid/sys/posix/PrivatePosix.h"
-#include "qpid/cluster/PollableCondition.h"
-#include "qpid/Exception.h"
-
-#include <unistd.h>
-#include <fcntl.h>
-
-namespace qpid {
-namespace cluster {
-
-PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) {
- int fds[2];
- if (::pipe(fds) == -1)
- throw ErrnoException(QPID_MSG("Can't create PollableCondition"));
- impl->fd = fds[0];
- writeFd = fds[1];
- if (::fcntl(impl->fd, F_SETFL, O_NONBLOCK) == -1)
- throw ErrnoException(QPID_MSG("Can't create PollableCondition"));
- if (::fcntl(writeFd, F_SETFL, O_NONBLOCK) == -1)
- throw ErrnoException(QPID_MSG("Can't create PollableCondition"));
-}
-
-bool PollableCondition::clear() {
- char buf[256];
- ssize_t n;
- bool wasSet = false;
- while ((n = ::read(impl->fd, buf, sizeof(buf))) > 0)
- wasSet = true;
- if (n == -1 && errno != EAGAIN) throw ErrnoException(QPID_MSG("Error clearing PollableCondition"));
- return wasSet;
-}
-
-void PollableCondition::set() {
- static const char dummy=0;
- ssize_t n = ::write(writeFd, &dummy, 1);
- if (n == -1 && errno != EAGAIN) throw ErrnoException("Error setting PollableCondition");
-}
-
-
-#if 0
-// FIXME aconway 2008-08-12: More efficient Linux implementation using
-// eventfd system call. Do a configure.ac test to enable this when
-// eventfd is available.
-
-#include <sys/eventfd.h>
-
-namespace qpid {
-namespace cluster {
-
-PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) {
- impl->fd = ::eventfd(0, 0);
- if (impl->fd < 0) throw ErrnoException("conditionfd() failed");
-}
-
-bool PollableCondition::clear() {
- char buf[8];
- ssize_t n = ::read(impl->fd, buf, 8);
- if (n != 8) throw ErrnoException("read failed on conditionfd");
- return *reinterpret_cast<uint64_t*>(buf);
-}
-
-void PollableCondition::set() {
- static const uint64_t value=1;
- ssize_t n = ::write(impl->fd, reinterpret_cast<const void*>(&value), 8);
- if (n != 8) throw ErrnoException("write failed on conditionfd");
-}
-
-#endif
-
-}} // namespace qpid::cluster
-
-#endif /*!QPID_SYS_LINUX_POLLABLECONDITION_CPP*/
diff --git a/cpp/src/qpid/cluster/PollableCondition.h b/cpp/src/qpid/cluster/PollableCondition.h
deleted file mode 100644
index 6bfca6cabe..0000000000
--- a/cpp/src/qpid/cluster/PollableCondition.h
+++ /dev/null
@@ -1,60 +0,0 @@
-#ifndef QPID_SYS_POLLABLECONDITION_H
-#define QPID_SYS_POLLABLECONDITION_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/IOHandle.h"
-
-// FIXME aconway 2008-08-11: this could be of more general interest,
-// move to sys namespace in common lib.
-//
-
-namespace qpid {
-namespace cluster {
-
-/**
- * A pollable condition to integrate in-process conditions with IO
- * conditions in a polling loop.
- *
- * Setting the condition makes it readable for a poller.
- *
- * Writable/disconnected conditions are undefined and should not be
- * polled for.
- */
-class PollableCondition : public sys::IOHandle {
- public:
- PollableCondition();
-
- /** Set the condition, triggers readable in a poller. */
- void set();
-
- /** Get the current state of the condition, then clear it.
- *@return The state of the condition before it was cleared.
- */
- bool clear();
-
- private:
- int writeFd;
-};
-}} // namespace qpid::cluster
-
-#endif /*!QPID_SYS_POLLABLECONDITION_H*/
diff --git a/cpp/src/qpid/cluster/PollableQueue.h b/cpp/src/qpid/cluster/PollableQueue.h
deleted file mode 100644
index 1c7720f5c6..0000000000
--- a/cpp/src/qpid/cluster/PollableQueue.h
+++ /dev/null
@@ -1,113 +0,0 @@
-#ifndef QPID_CLUSTER_POLLABLEQUEUE_H
-#define QPID_CLUSTER_POLLABLEQUEUE_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/cluster/PollableCondition.h"
-#include "qpid/sys/Dispatcher.h"
-#include "qpid/sys/Mutex.h"
-#include <boost/function.hpp>
-#include <boost/bind.hpp>
-#include <algorithm>
-#include <deque>
-
-namespace qpid {
-
-namespace sys { class Poller; }
-
-namespace cluster {
-
-// FIXME aconway 2008-08-11: this could be of more general interest,
-// move to common lib.
-
-/**
- * A queue that can be polled by sys::Poller. Any thread can push to
- * the queue, on wakeup the poller thread processes all items on the
- * queue by passing them to a callback in a batch.
- */
-template <class T>
-class PollableQueue {
- typedef std::deque<T> Queue;
-
- public:
- typedef typename Queue::iterator iterator;
-
- /** Callback to process a range of items. */
- typedef boost::function<void (const iterator&, const iterator&)> Callback;
-
- /** Functor tempalate to create a Callback from a functor that handles a single item. */
- template <class F> struct ForEach {
- F handleOne;
- ForEach(const F& f) : handleOne(f) {}
- void operator()(const iterator& i, const iterator& j) const { std::for_each(i, j, handleOne); }
- };
- /** Function to create ForEach instances */
- template <class F> static ForEach<F> forEach(const F& f) { return ForEach<F>(f); }
-
- /** When the queue is selected by the poller, values are passed to callback cb. */
- explicit PollableQueue(const Callback& cb);
-
- /** Push a value onto the queue. Thread safe */
- void push(const T& t) { ScopedLock l(lock); queue.push_back(t); condition.set(); }
-
- /** Start polling. */
- void start(const boost::shared_ptr<sys::Poller>& poller) { handle.startWatch(poller); }
-
- /** Stop polling. */
- void stop() { handle.stopWatch(); }
-
- private:
- typedef sys::Mutex::ScopedLock ScopedLock;
- typedef sys::Mutex::ScopedUnlock ScopedUnlock;
-
- void dispatch(sys::DispatchHandle&);
-
- sys::Mutex lock;
- Callback callback;
- PollableCondition condition;
- sys::DispatchHandle handle;
- Queue queue;
- Queue batch;
-};
-
-template <class T> PollableQueue<T>::PollableQueue(const Callback& cb) // FIXME aconway 2008-08-12:
- : callback(cb),
- handle(condition, boost::bind(&PollableQueue<T>::dispatch, this, _1), 0, 0)
-{}
-
-template <class T> void PollableQueue<T>::dispatch(sys::DispatchHandle& h) {
- ScopedLock l(lock); // Lock for concurrent push()
- batch.clear();
- batch.swap(queue);
- condition.clear();
- {
- // Process outside the lock to allow concurrent push.
- ScopedUnlock u(lock);
- callback(batch.begin(), batch.end());
- h.rewatch();
- }
- batch.clear();
-}
-
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_POLLABLEQUEUE_H*/