diff options
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/src/Makefile.am | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 18 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 13 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/PeriodicTimer.h | 56 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/PeriodicTimerImpl.cpp | 49 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/PeriodicTimerImpl.h | 45 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/Timer.h | 6 |
9 files changed, 172 insertions, 24 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index d0aae54be0..5886c53ff9 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -471,6 +471,9 @@ libqpidcommon_la_SOURCES += \ qpid/sys/TimeoutHandler.h \ qpid/sys/Timer.cpp \ qpid/sys/Timer.h \ + qpid/sys/PeriodicTimer.h \ + qpid/sys/PeriodicTimerImpl.h \ + qpid/sys/PeriodicTimerImpl.cpp \ qpid/sys/Waitable.h \ qpid/sys/alloca.h \ qpid/sys/uuid.h diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 9b05373144..f47b6418bd 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -49,6 +49,7 @@ #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/sys/TimeoutHandler.h" #include "qpid/sys/SystemInfo.h" +#include "qpid/sys/PeriodicTimerImpl.h" #include "qpid/Address.h" #include "qpid/Url.h" #include "qpid/Version.h" @@ -136,6 +137,7 @@ const std::string knownHostsNone("none"); Broker::Broker(const Broker::Options& conf) : poller(new Poller), + periodicTimer(new sys::PeriodicTimerImpl(timer)), config(conf), managementAgent(conf.enableMgmt ? new ManagementAgent() : 0), store(new NullMessageStore), diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index b85aa7d96c..04d62306da 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -49,6 +49,7 @@ #include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Timer.h" +#include "qpid/sys/PeriodicTimer.h" #include "qpid/RefCounted.h" #include "qpid/broker/AclModule.h" #include "qpid/sys/Mutex.h" @@ -146,6 +147,7 @@ public: boost::shared_ptr<sys::Poller> poller; sys::Timer timer; + std::auto_ptr<sys::PeriodicTimer> periodicTimer; Options config; std::auto_ptr<management::ManagementAgent> managementAgent; ProtocolFactoryMap protocolFactories; @@ -253,6 +255,8 @@ public: void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; } sys::Timer& getTimer() { return timer; } + sys::PeriodicTimer& getPeriodicTimer() { return *periodicTimer; } + void setPeriodicTimer(std::auto_ptr<sys::PeriodicTimer> pt) { periodicTimer = pt; } boost::function<std::vector<Url> ()> getKnownBrokers; diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 378e150a0f..9f7d8046d4 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -91,10 +91,12 @@ void ManagementAgent::configure(const string& _dataDir, uint16_t _interval, dataDir = _dataDir; interval = _interval; broker = _broker; - timer = &_broker->getTimer(); + timer = &_broker->getPeriodicTimer(); threadPoolSize = _threads; ManagementObject::maxThreads = threadPoolSize; - timer->add (new Periodic(*this, interval)); + timer->add (boost::bind(&ManagementAgent::periodicProcessing, this), + interval * sys::TIME_SEC, + "ManagementAgent::periodicProcessing"); // Get from file or generate and save to file. if (dataDir.empty()) @@ -231,17 +233,6 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi "console.event.1.0." + event.getPackageName() + "." + event.getEventName()); } -ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) - : TimerTask (qpid::sys::Duration ((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC)), agent(_agent) {} - -ManagementAgent::Periodic::~Periodic () {} - -void ManagementAgent::Periodic::fire () -{ - agent.timer->add (new Periodic (agent, agent.interval)); - agent.periodicProcessing (); -} - void ManagementAgent::clientAdded (const std::string& routingKey) { if (routingKey.find("console") != 0) @@ -332,6 +323,7 @@ void ManagementAgent::periodicProcessing (void) { #define BUFSIZE 65536 #define HEADROOM 4096 + QPID_LOG(trace, "Management agent periodic processing") Mutex::ScopedLock lock (userLock); char msgChars[BUFSIZE]; uint32_t contentSize; diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index 1c06e5896b..3dea8ce3c7 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -26,7 +26,7 @@ #include "qpid/broker/Exchange.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/Mutex.h" -#include "qpid/sys/Timer.h" +#include "qpid/sys/PeriodicTimer.h" #include "qpid/broker/ConnectionToken.h" #include "qpid/management/ManagementObject.h" #include "qpid/management/ManagementEvent.h" @@ -105,15 +105,6 @@ public: void importSchemas(framing::Buffer& inBuf); private: - struct Periodic : public qpid::sys::TimerTask - { - ManagementAgent& agent; - - Periodic (ManagementAgent& agent, uint32_t seconds); - virtual ~Periodic (); - void fire (); - }; - // Storage for tracking remote management agents, attached via the client // management agent API. // @@ -205,7 +196,7 @@ private: std::string dataDir; uint16_t interval; qpid::broker::Broker* broker; - qpid::sys::Timer* timer; + qpid::sys::PeriodicTimer* timer; uint16_t bootSequence; uint32_t nextObjectId; uint32_t brokerBank; diff --git a/cpp/src/qpid/sys/PeriodicTimer.h b/cpp/src/qpid/sys/PeriodicTimer.h new file mode 100644 index 0000000000..290ffb6218 --- /dev/null +++ b/cpp/src/qpid/sys/PeriodicTimer.h @@ -0,0 +1,56 @@ +#ifndef QPID_SYS_PERIODICTIMER_H +#define QPID_SYS_PERIODICTIMER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Timer.h" +#include <boost/function.hpp> + +namespace qpid { +namespace sys { + +/** + * Interface of a timer for periodic tasks that should be synchronized + * across all brokers in a periodic. The standalone broker + * implementation simply wraps qpid::sys::Timer. The clustered broker + * implementation synchronizes execution of periodic tasks on all + * periodic members. + */ +class PeriodicTimer +{ + public: + typedef boost::function<void()> Task; + + QPID_COMMON_EXTERN virtual ~PeriodicTimer() {} + + /** + * Add a named task to be executed at the given period. + * + * The task registered under the same name will be executed on + * all brokers at the given period. + */ + virtual void add(const Task& task, Duration period, const std::string& taskName) = 0; + +}; +}} // namespace qpid::sys + +#endif /*!QPID_SYS_PERIODICTIMER_H*/ diff --git a/cpp/src/qpid/sys/PeriodicTimerImpl.cpp b/cpp/src/qpid/sys/PeriodicTimerImpl.cpp new file mode 100644 index 0000000000..e2a6dccf3e --- /dev/null +++ b/cpp/src/qpid/sys/PeriodicTimerImpl.cpp @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "PeriodicTimerImpl.h" + +namespace qpid { +namespace sys { + +PeriodicTimerImpl::PeriodicTimerImpl(Timer& t) : timer(t) {} + +struct PeriodicTimerImpl::TaskImpl : public TimerTask { + Timer& timer; + Task task; + + TaskImpl(Timer& timer_, const Task& task_, Duration period) : + TimerTask(period), timer(timer_), task(task_) {} + + void fire() { + task(); + setupNextFire(); + timer.add(this); + } +}; + +void PeriodicTimerImpl::add( + const Task& task, Duration period, const std::string& /*taskName*/ +) +{ + timer.add(new TaskImpl(timer, task, period)); +} + +}} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/PeriodicTimerImpl.h b/cpp/src/qpid/sys/PeriodicTimerImpl.h new file mode 100644 index 0000000000..51ce0ffdf9 --- /dev/null +++ b/cpp/src/qpid/sys/PeriodicTimerImpl.h @@ -0,0 +1,45 @@ +#ifndef QPID_SYS_PERIODICTIMERIMPL_H +#define QPID_SYS_PERIODICTIMERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "PeriodicTimer.h" + +namespace qpid { +namespace sys { + +/** + * Standalone broker implementation of PeriodicTimer. + */ +class PeriodicTimerImpl : public PeriodicTimer +{ + public: + PeriodicTimerImpl(Timer& timer); + void add(const Task& task, Duration period, const std::string& taskName); + + private: + struct TaskImpl; + Timer& timer; +}; +}} // namespace qpid::sys + +#endif /*!QPID_SYS_PERIODICTIMER_H*/ diff --git a/cpp/src/qpid/sys/Timer.h b/cpp/src/qpid/sys/Timer.h index 5748503841..303d44a299 100644 --- a/cpp/src/qpid/sys/Timer.h +++ b/cpp/src/qpid/sys/Timer.h @@ -69,6 +69,12 @@ protected: bool operator<(const boost::intrusive_ptr<TimerTask>& a, const boost::intrusive_ptr<TimerTask>& b); +/** + A timer to trigger tasks that are local to one broker. + + For periodic tasks that should be synchronized across all brokers + in a cluster, use qpid::sys::PeriodicTimer. + */ class Timer : private Runnable { qpid::sys::Monitor monitor; std::priority_queue<boost::intrusive_ptr<TimerTask> > tasks; |
