summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Multicaster.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-03-12 20:11:31 +0000
committerAlan Conway <aconway@apache.org>2010-03-12 20:11:31 +0000
commitef9268528d3147173dfb0d2ef707ee3e4fc4f210 (patch)
tree4d8a9851683812bd04392f57c695a5143c80ca79 /cpp/src/qpid/cluster/Multicaster.cpp
parent937fe6e7295efff28cb680642fca28ebf65e7d4e (diff)
downloadqpid-python-ef9268528d3147173dfb0d2ef707ee3e4fc4f210.tar.gz
New cluster member pushes store when joining an active cluster.
Previously a broker with a clean store would not be able to join an active cluster because the shtudown-id did not match. This commit ensures that when a broker joins an active cluster, it always pushes its store regardless of status. Clean/dirty status is only compared when forming an initial cluster. This change required splitting initialization into two phases: PRE_INIT: occurs in the Cluster ctor during early-initialize. This phase determines whether or not to push the store. INIT: occurs after Cluster::initialize and does the remaining initialization chores. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@922412 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Multicaster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Multicaster.cpp21
1 files changed, 15 insertions, 6 deletions
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp
index 4a8195438f..d57ff76941 100644
--- a/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/cpp/src/qpid/cluster/Multicaster.cpp
@@ -33,10 +33,8 @@ Multicaster::Multicaster(Cpg& cpg_,
boost::function<void()> onError_) :
onError(onError_), cpg(cpg_),
queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
- ready(false)
-{
- queue.start();
-}
+ ready(false), bypass(true)
+{}
void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) {
mcast(Event::control(body, id));
@@ -61,10 +59,16 @@ void Multicaster::mcast(const Event& e) {
}
}
QPID_LOG(trace, "MCAST " << e);
- queue.push(e);
+ if (bypass) { // direct, don't queue
+ iovec iov = e.toIovec();
+ // FIXME aconway 2010-03-10: should do limited retry.
+ while (!cpg.mcast(&iov, 1))
+ ;
+ }
+ else
+ queue.push(e);
}
-
Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) {
try {
PollableEventQueue::Batch::const_iterator i = values.begin();
@@ -86,6 +90,11 @@ Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(co
}
}
+void Multicaster::start() {
+ queue.start();
+ bypass = false;
+}
+
void Multicaster::setReady() {
sys::Mutex::ScopedLock l(lock);
ready = true;