summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp11
-rw-r--r--cpp/src/qpid/broker/Broker.h8
-rw-r--r--cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp50
-rw-r--r--cpp/src/qpid/broker/DelegatingPeriodicTimer.h57
4 files changed, 121 insertions, 5 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index f47b6418bd..cd3b014256 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -137,7 +137,6 @@ const std::string knownHostsNone("none");
Broker::Broker(const Broker::Options& conf) :
poller(new Poller),
- periodicTimer(new sys::PeriodicTimerImpl(timer)),
config(conf),
managementAgent(conf.enableMgmt ? new ManagementAgent() : 0),
store(new NullMessageStore),
@@ -258,6 +257,12 @@ Broker::Broker(const Broker::Options& conf) :
// Initialize plugins
Plugin::initializeAll(*this);
+ if (!periodicTimer.hasDelegate()) {
+ // If no plugin has contributed a PeriodicTimer, use the default one.
+ periodicTimer.setDelegate(
+ std::auto_ptr<sys::PeriodicTimer>(new sys::PeriodicTimerImpl(timer)));
+ }
+
if (conf.queueCleanInterval) {
queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC);
}
@@ -469,6 +474,10 @@ Broker::getKnownBrokersImpl()
return knownBrokers;
}
+void Broker::setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt) {
+ periodicTimer.setDelegate(pt);
+}
+
const std::string Broker::TCP_TRANSPORT("tcp");
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 04d62306da..302ef74e3d 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -25,6 +25,7 @@
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/ConnectionFactory.h"
#include "qpid/broker/ConnectionToken.h"
+#include "qpid/broker/DelegatingPeriodicTimer.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/DtxManager.h"
#include "qpid/broker/ExchangeRegistry.h"
@@ -49,7 +50,6 @@
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Timer.h"
-#include "qpid/sys/PeriodicTimer.h"
#include "qpid/RefCounted.h"
#include "qpid/broker/AclModule.h"
#include "qpid/sys/Mutex.h"
@@ -147,7 +147,7 @@ public:
boost::shared_ptr<sys::Poller> poller;
sys::Timer timer;
- std::auto_ptr<sys::PeriodicTimer> periodicTimer;
+ DelegatingPeriodicTimer periodicTimer;
Options config;
std::auto_ptr<management::ManagementAgent> managementAgent;
ProtocolFactoryMap protocolFactories;
@@ -255,8 +255,8 @@ public:
void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
sys::Timer& getTimer() { return timer; }
- sys::PeriodicTimer& getPeriodicTimer() { return *periodicTimer; }
- void setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt) { periodicTimer = pt; }
+ sys::PeriodicTimer& getPeriodicTimer() { return periodicTimer; }
+ void setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt);
boost::function<std::vector<Url> ()> getKnownBrokers;
diff --git a/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp b/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp
new file mode 100644
index 0000000000..111d968543
--- /dev/null
+++ b/cpp/src/qpid/broker/DelegatingPeriodicTimer.cpp
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DelegatingPeriodicTimer.h"
+
+namespace qpid {
+namespace broker {
+
+DelegatingPeriodicTimer::DelegatingPeriodicTimer() {}
+
+void DelegatingPeriodicTimer::add(
+ const Task& task, sys::Duration period, const std::string& taskName)
+{
+ if (delegate.get())
+ delegate->add(task, period, taskName);
+ else {
+ Entry e;
+ e.task = task;
+ e.period = period;
+ e.name = taskName;
+ entries.push_back(e);
+ }
+}
+
+void DelegatingPeriodicTimer::setDelegate(std::auto_ptr<PeriodicTimer> impl) {
+ assert(impl.get());
+ assert(!delegate.get());
+ delegate = impl;
+ for (Entries::iterator i = entries.begin(); i != entries.end(); ++i)
+ delegate->add(i->task, i->period, i->name);
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/DelegatingPeriodicTimer.h b/cpp/src/qpid/broker/DelegatingPeriodicTimer.h
new file mode 100644
index 0000000000..5186f41c3e
--- /dev/null
+++ b/cpp/src/qpid/broker/DelegatingPeriodicTimer.h
@@ -0,0 +1,57 @@
+#ifndef QPID_BROKER_PERIODICTIMER_H
+#define QPID_BROKER_PERIODICTIMER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/PeriodicTimer.h"
+#include <vector>
+#include <memory>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * A PeriodicTimer implementation that delegates to another PeriodicTimer.
+ *
+ * Tasks added while there is no delegate timer are stored.
+ * When a delgate timer is set, stored tasks are added to it.
+ */
+class DelegatingPeriodicTimer : public sys::PeriodicTimer
+{
+ public:
+ DelegatingPeriodicTimer();
+ /** Add a task: if no delegate, store it. When delegate set, add stored tasks */
+ void add(const Task& task, sys::Duration period, const std::string& taskName);
+ /** Set the delegate, transfers ownership of delegate. */
+ void setDelegate(std::auto_ptr<PeriodicTimer> delegate);
+ bool hasDelegate() { return delegate.get(); }
+ private:
+ struct Entry { Task task; sys::Duration period; std::string name; };
+ typedef std::vector<Entry> Entries;
+ std::auto_ptr<PeriodicTimer> delegate;
+ Entries entries;
+
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_PERIODICTIMER_H*/