summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/FailoverExchange.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-06-15 20:15:51 +0000
committerAlan Conway <aconway@apache.org>2011-06-15 20:15:51 +0000
commit4cdf746f5bb38db60821047c3393f89f15b26f1e (patch)
tree610a404288b464a2225668c128fa77a84020ea62 /cpp/src/qpid/cluster/FailoverExchange.cpp
parent8034affaba71c0d991bfe1fff5de537f73d0f404 (diff)
downloadqpid-python-4cdf746f5bb38db60821047c3393f89f15b26f1e.tar.gz
QPID-3280: Performance problem with TTL messages.
When sending a large number of messages with nonzero TTLs to a cluster, overall message throughput drops by around 20-30% compared to messages with TTL 0. The previous approach to TTL in the cluster is replaced with a simpler "cluster clock". Also QueueCleaner is executed in the cluster timer, and modified to be deterministic in a cluster. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1136170 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/FailoverExchange.cpp')
-rw-r--r--cpp/src/qpid/cluster/FailoverExchange.cpp26
1 files changed, 16 insertions, 10 deletions
diff --git a/cpp/src/qpid/cluster/FailoverExchange.cpp b/cpp/src/qpid/cluster/FailoverExchange.cpp
index 84232dac1b..cfbe34a460 100644
--- a/cpp/src/qpid/cluster/FailoverExchange.cpp
+++ b/cpp/src/qpid/cluster/FailoverExchange.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,8 +39,10 @@ using namespace broker;
using namespace framing;
const string FailoverExchange::typeName("amq.failover");
-
-FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b) : Exchange(typeName, parent, b ) {
+
+FailoverExchange::FailoverExchange(management::Manageable* parent, Broker* b)
+ : Exchange(typeName, parent, b ), ready(false)
+{
if (mgmtExchange != 0)
mgmtExchange->set_type(typeName);
}
@@ -53,16 +55,17 @@ void FailoverExchange::setUrls(const vector<Url>& u) {
void FailoverExchange::updateUrls(const vector<Url>& u) {
Lock l(lock);
urls=u;
- if (urls.empty()) return;
- std::for_each(queues.begin(), queues.end(),
- boost::bind(&FailoverExchange::sendUpdate, this, _1));
+ if (ready && !urls.empty()) {
+ std::for_each(queues.begin(), queues.end(),
+ boost::bind(&FailoverExchange::sendUpdate, this, _1));
+ }
}
string FailoverExchange::getType() const { return typeName; }
bool FailoverExchange::bind(Queue::shared_ptr queue, const string&, const framing::FieldTable*) {
Lock l(lock);
- sendUpdate(queue);
+ if (ready) sendUpdate(queue);
return queues.insert(queue).second;
}
@@ -84,7 +87,7 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) {
// Called with lock held.
if (urls.empty()) return;
framing::Array array(0x95);
- for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i)
+ for (Urls::const_iterator i = urls.begin(); i != urls.end(); ++i)
array.add(boost::shared_ptr<Str16Value>(new Str16Value(i->str())));
const ProtocolVersion v;
boost::intrusive_ptr<Message> msg(new Message);
@@ -96,9 +99,12 @@ void FailoverExchange::sendUpdate(const Queue::shared_ptr& queue) {
header.get<MessageProperties>(true)->getApplicationHeaders().setArray(typeName, array);
AMQFrame headerFrame(header);
headerFrame.setFirstSegment(false);
- msg->getFrames().append(headerFrame);
+ msg->getFrames().append(headerFrame);
DeliverableMessage(msg).deliverTo(queue);
}
+void FailoverExchange::setReady() {
+ ready = true;
+}
}} // namespace cluster