diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 11 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/FailoverExchange.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 25 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Multicaster.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Quorum_cman.cpp | 2 |
7 files changed, 24 insertions, 21 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 52f5e4872d..ad1f4b704d 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -187,14 +187,15 @@ void Cluster::deliver( Mutex::ScopedLock l(lock); MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); - deliver(Event::decode(from, buf), l); + Event e(Event::decode(from, buf)); + if (from == myId) // Record self-deliveries for flow control. + mcast.selfDeliver(e); + deliver(e, l); } void Cluster::deliver(const Event& e, Lock&) { if (state == LEFT) return; QPID_LOG(trace, *this << " PUSH: " << e); - if (e.getMemberId() == myId) - mcast.delivered(e); // Note delivery for flow control deliverQueue.push(e); } @@ -215,7 +216,7 @@ void Cluster::deliveredEvent(const Event& e) { if (e.isCluster()) { while (frame.decode(buf)) { QPID_LOG(trace, *this << " DLVR: " << e << " " << frame); - Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope is too big. + Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big? ClusterDispatcher dispatch(*this, e.getMemberId(), l); if (!framing::invoke(dispatch, *frame.getBody()).wasHandled()) throw Exception(QPID_MSG("Invalid cluster control")); @@ -406,8 +407,6 @@ void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid& } } -// FIXME aconway 2008-10-15: no longer need a separate control now -// that the dump control is in the deliver queue. void Cluster::dumpStart(const MemberId& dumpee, const Url& url, Lock&) { if (state == LEFT) return; assert(state == OFFER); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 7f3a9ac6aa..6e1d275162 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -43,7 +43,6 @@ struct ClusterValues { bool quorum; size_t readMax, writeEstimate, mcastMax; - // FIXME aconway 2008-12-09: revisit default. ClusterValues() : quorum(false), readMax(10), writeEstimate(64), mcastMax(10) {} Url getUrl(uint16_t port) const { diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 29e42ce534..8a8aa86c57 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -167,8 +167,6 @@ class Connection : framing::ChannelId currentChannel; boost::shared_ptr<broker::TxBuffer> txBuffer; - int FIXMEcredit; // FIXME aconway 2008-12-05: remove - friend std::ostream& operator<<(std::ostream&, const Connection&); }; diff --git a/cpp/src/qpid/cluster/FailoverExchange.cpp b/cpp/src/qpid/cluster/FailoverExchange.cpp index abc7f5df6f..e438d958ea 100644 --- a/cpp/src/qpid/cluster/FailoverExchange.cpp +++ b/cpp/src/qpid/cluster/FailoverExchange.cpp @@ -79,7 +79,7 @@ void FailoverExchange::route(Deliverable&, const string& , const framing::FieldT void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) { // Called with lock held. if (urls.empty()) return; - framing::Array array(0x95); // FIXME aconway 2008-10-06: Array is unusable like this. Need type constants or better mapping. + framing::Array array(0x95); for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i) array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str()))); const ProtocolVersion v; diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index b02fa16ae9..34614dc1ef 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -68,21 +68,27 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) { try { PollableEventQueue::Queue::iterator i = values.begin(); while( i != values.end()) { - iovec iov = { const_cast<char*>(i->getStore()), i->getStoreSize() }; - if (!cpg.mcast(&iov, 1)) - break; // cpg.mcast returns false for flow control - QPID_LOG(trace, " MCAST " << *i); - ++i; if (mcastMax) { sys::Mutex::ScopedLock l(lock); - assert(pending < mcastMax); - if (++pending == mcastMax) { + if (pending == mcastMax) { queue.stop(); break ; } + ++pending; + } + iovec iov = { const_cast<char*>(i->getStore()), i->getStoreSize() }; + if (!cpg.mcast(&iov, 1)) { + // cpg didn't send because of CPG flow control. + if (mcastMax) { + sys::Mutex::ScopedLock l(lock); + --pending; + } + break; } + QPID_LOG(trace, " MCAST " << *i); + ++i; } - values.erase(values.begin(), i); + values.erase(values.begin(), i); // Erase sent events. } catch (const std::exception& e) { QPID_LOG(critical, "Multicast error: " << e.what()); @@ -98,9 +104,10 @@ void Multicaster::release() { holdingQueue.clear(); } -void Multicaster::delivered(const Event&) { +void Multicaster::selfDeliver(const Event&) { sys::Mutex::ScopedLock l(lock); if (mcastMax) { + assert(pending > 0); assert(pending <= mcastMax); if (pending == mcastMax) queue.start(); diff --git a/cpp/src/qpid/cluster/Multicaster.h b/cpp/src/qpid/cluster/Multicaster.h index ef8be8c229..8014cd8492 100644 --- a/cpp/src/qpid/cluster/Multicaster.h +++ b/cpp/src/qpid/cluster/Multicaster.h @@ -57,7 +57,7 @@ class Multicaster /** End holding mode, held events are mcast */ void release(); /** Call when events are self-delivered to manage flow control. */ - void delivered(const Event& e); + void selfDeliver(const Event&); private: typedef sys::PollableQueue<Event> PollableEventQueue; diff --git a/cpp/src/qpid/cluster/Quorum_cman.cpp b/cpp/src/qpid/cluster/Quorum_cman.cpp index d5df758b40..edce1698ee 100644 --- a/cpp/src/qpid/cluster/Quorum_cman.cpp +++ b/cpp/src/qpid/cluster/Quorum_cman.cpp @@ -35,7 +35,7 @@ void Quorum::init() { enable = true; cman = cman_init(0); if (cman == 0) throw ErrnoException("Can't connect to cman service"); - // FIXME aconway 2008-11-13: configure max wait. + // FIXME aconway 2008-11-13: configurable max wait. for (int retry = 0; !cman_is_quorate(cman) && retry < 30; retry++) { QPID_LOG(info, "Waiting for cluster quorum: " << sys::strError(errno)); sys::sleep(1); |
