summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-10-14 11:06:41 +0000
committerGordon Sim <gsim@apache.org>2008-10-14 11:06:41 +0000
commit137eaf938a010d8bb5d248c094891f7a2925ef55 (patch)
treec5760a56966c1b48ba50cc516d1cbb8f42d2a7d2 /cpp/src/qpid
parent76dc7ca3e92919d83932e66906425067652e76f5 (diff)
downloadqpid-python-137eaf938a010d8bb5d248c094891f7a2925ef55.tar.gz
Update to periodic purge of expired messages: check the dequeue rate to avoid interfering unnecessarily where the dequeing is sufficient to remove expired messages.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@704461 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp26
-rw-r--r--cpp/src/qpid/broker/Queue.h2
-rw-r--r--cpp/src/qpid/broker/RateTracker.cpp48
-rw-r--r--cpp/src/qpid/broker/RateTracker.h57
4 files changed, 123 insertions, 10 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 355f822b57..1f508a1cc7 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -405,19 +405,24 @@ QueuedMessage Queue::get(){
void Queue::purgeExpired()
{
- Messages expired;
- {
- Mutex::ScopedLock locker(messageLock);
- for (Messages::iterator i = messages.begin(); i != messages.end();) {
- if (i->payload->hasExpired()) {
- expired.push_back(*i);
- i = messages.erase(i);
- } else {
- ++i;
+ //As expired messages are discarded during dequeue also, only
+ //bother explicitly expiring if the rate of dequeues since last
+ //attempt is less than one per second.
+ if (dequeueTracker.sampleRatePerSecond() < 1) {
+ Messages expired;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ for (Messages::iterator i = messages.begin(); i != messages.end();) {
+ if (i->payload->hasExpired()) {
+ expired.push_back(*i);
+ i = messages.erase(i);
+ } else {
+ ++i;
+ }
}
}
+ for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
- for_each(expired.begin(), expired.end(), bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
/**
@@ -465,6 +470,7 @@ void Queue::popMsg(QueuedMessage& qmsg)
lvq.erase(key);
}
messages.pop_front();
+ ++dequeueTracker;
}
void Queue::push(boost::intrusive_ptr<Message>& msg){
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index fc628bbbc0..d9af63d3d9 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -27,6 +27,7 @@
#include "PersistableQueue.h"
#include "QueuePolicy.h"
#include "QueueBindings.h"
+#include "RateTracker.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/Monitor.h"
@@ -94,6 +95,7 @@ namespace qpid {
boost::shared_ptr<Exchange> alternateExchange;
framing::SequenceNumber sequence;
qmf::org::apache::qpid::broker::Queue* mgmtObject;
+ RateTracker dequeueTracker;
void push(boost::intrusive_ptr<Message>& msg);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
diff --git a/cpp/src/qpid/broker/RateTracker.cpp b/cpp/src/qpid/broker/RateTracker.cpp
new file mode 100644
index 0000000000..5377bcfc0b
--- /dev/null
+++ b/cpp/src/qpid/broker/RateTracker.cpp
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "RateTracker.h"
+
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::TIME_SEC;
+
+namespace qpid {
+namespace broker {
+
+RateTracker::RateTracker() : currentCount(0), lastCount(0), lastTime(AbsTime::now()) {}
+
+RateTracker& RateTracker::operator++()
+{
+ ++currentCount;
+ return *this;
+}
+
+double RateTracker::sampleRatePerSecond()
+{
+ int32_t increment = currentCount - lastCount;
+ AbsTime now = AbsTime::now();
+ Duration interval(lastTime, now);
+ lastCount = currentCount;
+ lastTime = now;
+ return increment ? increment / (interval / TIME_SEC) : 0;
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/RateTracker.h b/cpp/src/qpid/broker/RateTracker.h
new file mode 100644
index 0000000000..0c20b37312
--- /dev/null
+++ b/cpp/src/qpid/broker/RateTracker.h
@@ -0,0 +1,57 @@
+#ifndef QPID_BROKER_RATETRACKER_H
+#define QPID_BROKER_RATETRACKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Time.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Simple rate tracker: represents some value that can be incremented,
+ * then can periodcially sample the rate of increments.
+ */
+class RateTracker
+{
+ public:
+ RateTracker();
+ /**
+ * Increments the count being tracked. Can be called concurrently
+ * with other calls to this operator as well as with calls to
+ * sampleRatePerSecond().
+ */
+ RateTracker& operator++();
+ /**
+ * Returns the rate of increments per second since last
+ * called. Calls to this method should be serialised, but can be
+ * called concurrently with the increment operator
+ */
+ double sampleRatePerSecond();
+ private:
+ volatile int32_t currentCount;
+ int32_t lastCount;
+ qpid::sys::AbsTime lastTime;
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_RATETRACKER_H*/