diff options
| author | Alan Conway <aconway@apache.org> | 2009-11-24 20:07:24 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2009-11-24 20:07:24 +0000 |
| commit | 0fb7ff9cfbfd01e9093c2c6021a5915696d2a089 (patch) | |
| tree | 1d2db335592be80a9aa9f8f404d2c1682afeb485 /cpp/src/qpid/cluster/Multicaster.cpp | |
| parent | 1ee447563d208b39e962537a47f14aea741777b0 (diff) | |
| download | qpid-python-0fb7ff9cfbfd01e9093c2c6021a5915696d2a089.tar.gz | |
Support for restarting a persistent cluster.
Option --cluster-size=N: members wait for N members before recovering store.
Stores marked as clean/dirty. Automatically recover from clean store on restart.
Stores marked with UUID to detect errors.
Not yet implemented: consistency checks, manual recovery from all dirty stores.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@883842 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Multicaster.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 21 |
1 files changed, 13 insertions, 8 deletions
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index 72fc1533f8..229d7edb1e 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -33,36 +33,41 @@ Multicaster::Multicaster(Cpg& cpg_, boost::function<void()> onError_) : onError(onError_), cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), - holding(true) + ready(false) { queue.start(); } void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) { - QPID_LOG(trace, "MCAST " << id << ": " << body); mcast(Event::control(body, id)); } void Multicaster::mcastControl(const framing::AMQFrame& frame, const ConnectionId& id) { - QPID_LOG(trace, "MCAST " << id << ": " << frame); mcast(Event::control(frame, id)); } void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) { Event e(DATA, id, size); memcpy(e.getData(), data, size); - QPID_LOG(trace, "MCAST " << e); mcast(e); } void Multicaster::mcast(const Event& e) { { sys::Mutex::ScopedLock l(lock); - if (e.isConnection() && holding) { - holdingQueue.push_back(e); + if (!ready) { + if (e.isConnection()) holdingQueue.push_back(e); + else { + iovec iov = e.toIovec(); + // FIXME aconway 2009-11-23: configurable retry --cluster-retry + if (!cpg.mcast(&iov, 1)) + throw Exception("CPG flow control error during initialization"); + QPID_LOG(trace, "MCAST (direct) " << e); + } return; } } + QPID_LOG(trace, "MCAST " << e); queue.push(e); } @@ -88,9 +93,9 @@ Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(co } } -void Multicaster::release() { +void Multicaster::setReady() { sys::Mutex::ScopedLock l(lock); - holding = false; + ready = true; std::for_each(holdingQueue.begin(), holdingQueue.end(), boost::bind(&Multicaster::mcast, this, _1)); holdingQueue.clear(); } |
