diff options
Diffstat (limited to 'cpp/src/qpid/cluster/FailoverExchange.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/FailoverExchange.cpp | 26 |
1 files changed, 16 insertions, 10 deletions
diff --git a/cpp/src/qpid/cluster/FailoverExchange.cpp b/cpp/src/qpid/cluster/FailoverExchange.cpp index 84232dac1b..cfbe34a460 100644 --- a/cpp/src/qpid/cluster/FailoverExchange.cpp +++ b/cpp/src/qpid/cluster/FailoverExchange.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -39,8 +39,10 @@ using namespace broker; using namespace framing; const string FailoverExchange::typeName("amq.failover"); - -FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b) : Exchange(typeName, parent, b ) { + +FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b) + : Exchange(typeName, parent, b ), ready(false) +{ if (mgmtExchange != 0) mgmtExchange->set_type(typeName); } @@ -53,16 +55,17 @@ void FailoverExchange::setUrls(const vector<Url>& u) { void FailoverExchange::updateUrls(const vector<Url>& u) { Lock l(lock); urls=u; - if (urls.empty()) return; - std::for_each(queues.begin(), queues.end(), - boost::bind(&FailoverExchange::sendUpdate, this, _1)); + if (ready && !urls.empty()) { + std::for_each(queues.begin(), queues.end(), + boost::bind(&FailoverExchange::sendUpdate, this, _1)); + } } string FailoverExchange::getType() const { return typeName; } bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) { Lock l(lock); - sendUpdate(queue); + if (ready) sendUpdate(queue); return queues.insert(queue).second; } @@ -84,7 +87,7 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) { // Called with lock held. if (urls.empty()) return; framing::Array array(0x95); - for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i) + for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i) array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str()))); const ProtocolVersion v; boost::intrusive_ptr<Message> msg(new Message); @@ -96,9 +99,12 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) { header.get<MessageProperties>(true)->getApplicationHeaders().setArray(typeName, array); AMQFrame headerFrame(header); headerFrame.setFirstSegment(false); - msg->getFrames().append(headerFrame); + msg->getFrames().append(headerFrame); DeliverableMessage(msg).deliverTo(queue); } +void FailoverExchange::setReady() { + ready = true; +} }} // namespace cluster |