diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2012-12-21 17:04:33 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2012-12-21 17:04:33 +0000 |
| commit | 2079387cbd734d013c2c6a2ae515b0430a5d512e (patch) | |
| tree | fe620c22c1e6c22cde2df43ca5edaf4f440c641e /qpid/cpp/src | |
| parent | af21f1bdc2a944eb3b6993f159e8e3ff8bca1629 (diff) | |
| download | qpid-python-2079387cbd734d013c2c6a2ae515b0430a5d512e.tar.gz | |
NO-JIRA: Removed Timer.h out of the chain of header files included by Broker.h
(since that is widely included) to avoid unnecessary recompilation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1425037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 8 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Connection.cpp | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/DtxManager.cpp | 41 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/DtxManager.h | 13 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp | 9 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/DtxWorkRecord.h | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueCleaner.cpp | 33 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueCleaner.h | 20 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 46 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 13 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 1 |
16 files changed, 128 insertions, 77 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 15382ae80d..b9d528011f 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -67,6 +67,7 @@ #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Time.h" +#include "qpid/sys/Timer.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/sys/TimeoutHandler.h" @@ -196,6 +197,7 @@ framing::FieldTable noReplicateArgs() { Broker::Broker(const Broker::Options& conf) : poller(new Poller), + timer(new qpid::sys::Timer), config(conf), managementAgent(conf.enableMgmt ? new ManagementAgent(conf.qmf1Support, conf.qmf2Support) @@ -207,13 +209,13 @@ Broker::Broker(const Broker::Options& conf) : exchanges(this), links(this), factory(new SecureConnectionFactory(*this)), - dtxManager(timer), + dtxManager(*timer.get()), sessionManager( qpid::SessionState::Configuration( conf.replayFlushLimit*1024, // convert kb to bytes. conf.replayHardLimit*1024), *this), - queueCleaner(queues, &timer), + queueCleaner(queues, timer.get()), recoveryInProgress(false), expiryPolicy(new ExpiryPolicy), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) @@ -437,7 +439,7 @@ Broker::~Broker() { finalize(); // Finalize any plugins. if (config.auth) SaslAuthenticator::fini(); - timer.stop(); + timer->stop(); QPID_LOG(notice, "Shut down"); } diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 9a5204dbf6..5c9c9edb12 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -52,7 +52,6 @@ #include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/ConnectionCodec.h" #include "qpid/sys/Runnable.h" -#include "qpid/sys/Timer.h" #include "qpid/types/Variant.h" #include "qpid/RefCounted.h" #include "qpid/broker/AclModule.h" @@ -67,6 +66,7 @@ namespace qpid { namespace sys { class ProtocolFactory; class Poller; +class Timer; } struct Url; @@ -157,7 +157,7 @@ class Broker : public sys::Runnable, public Plugin::Target, Manageable::status_t setTimestampConfig(const bool receive, const ConnectionState* context); boost::shared_ptr<sys::Poller> poller; - sys::Timer timer; + std::auto_ptr<sys::Timer> timer; Options config; std::auto_ptr<management::ManagementAgent> managementAgent; ProtocolFactoryMap protocolFactories; @@ -265,7 +265,7 @@ class Broker : public sys::Runnable, public Plugin::Target, QPID_BROKER_EXTERN boost::shared_ptr<sys::Poller> getPoller(); /** Timer for local tasks affecting only this broker */ - sys::Timer& getTimer() { return timer; } + sys::Timer& getTimer() { return *timer; } boost::function<std::vector<Url> ()> getKnownBrokers; diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp index 43d7b04783..f6185d56a4 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/Connection.cpp @@ -26,6 +26,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/sys/SecuritySettings.h" +#include "qpid/sys/Timer.h" #include "qpid/log/Statement.h" #include "qpid/ptr_map.h" diff --git a/qpid/cpp/src/qpid/broker/DtxManager.cpp b/qpid/cpp/src/qpid/broker/DtxManager.cpp index d482c2c327..5233e07b2b 100644 --- a/qpid/cpp/src/qpid/broker/DtxManager.cpp +++ b/qpid/cpp/src/qpid/broker/DtxManager.cpp @@ -27,6 +27,9 @@ #include "qpid/ptr_map.h" #include <boost/format.hpp> +#include <boost/bind.hpp> +#include <boost/function.hpp> + #include <iostream> using boost::intrusive_ptr; @@ -35,6 +38,30 @@ using qpid::ptr_map_ptr; using namespace qpid::broker; using namespace qpid::framing; +namespace { + typedef boost::function0<void> FireFunction; + struct DtxCleanup : public qpid::sys::TimerTask + { + FireFunction fireFunction; + + DtxCleanup(uint32_t timeout, FireFunction f); + void fire(); + }; + + DtxCleanup::DtxCleanup(uint32_t _timeout, FireFunction f) + : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxCleanup"), fireFunction(f){} + + void DtxCleanup::fire() + { + try { + fireFunction(); + } catch (qpid::ConnectionException& /*e*/) { + //assume it was explicitly cleaned up after a call to prepare, commit or rollback + } + } + +} + DtxManager::DtxManager(qpid::sys::Timer& t) : store(0), timer(&t) {} DtxManager::~DtxManager() {} @@ -156,19 +183,7 @@ void DtxManager::timedout(const std::string& xid) } else { ptr_map_ptr(i)->timedout(); //TODO: do we want to have a timed task to cleanup, or can we rely on an explicit completion? - //timer.add(intrusive_ptr<TimerTask>(new DtxCleanup(60*30/*30 mins*/, *this, xid))); - } -} - -DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid) - : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC),"DtxCleanup"), mgr(_mgr), xid(_xid) {} - -void DtxManager::DtxCleanup::fire() -{ - try { - mgr.remove(xid); - } catch (ConnectionException& /*e*/) { - //assume it was explicitly cleaned up after a call to prepare, commit or rollback + //timer->add(new DtxCleanup(60*30/*30 mins*/, boost::bind(&DtxManager::remove, this, xid))); } } diff --git a/qpid/cpp/src/qpid/broker/DtxManager.h b/qpid/cpp/src/qpid/broker/DtxManager.h index 8f76790720..81175e5dc3 100644 --- a/qpid/cpp/src/qpid/broker/DtxManager.h +++ b/qpid/cpp/src/qpid/broker/DtxManager.h @@ -31,20 +31,15 @@ #include "qpid/ptr_map.h" namespace qpid { +namespace sys { +class Timer; +} + namespace broker { class DtxManager{ typedef boost::ptr_map<std::string, DtxWorkRecord> WorkMap; - struct DtxCleanup : public sys::TimerTask - { - DtxManager& mgr; - const std::string& xid; - - DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid); - void fire(); - }; - WorkMap work; TransactionalStore* store; qpid::sys::Mutex lock; diff --git a/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp b/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp index 2c7df95fc6..ad02892895 100644 --- a/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp @@ -20,7 +20,10 @@ */ #include "qpid/broker/DtxWorkRecord.h" #include "qpid/broker/DtxManager.h" +#include "qpid/broker/DtxTimeout.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/sys/Timer.h" + #include <boost/format.hpp> #include <boost/mem_fn.hpp> using boost::mem_fn; @@ -39,6 +42,12 @@ DtxWorkRecord::~DtxWorkRecord() } } +void DtxWorkRecord::setTimeout(boost::intrusive_ptr<DtxTimeout> t) +{ timeout = t; } + +boost::intrusive_ptr<DtxTimeout> DtxWorkRecord::getTimeout() +{ return timeout; } + bool DtxWorkRecord::prepare() { Mutex::ScopedLock locker(lock); diff --git a/qpid/cpp/src/qpid/broker/DtxWorkRecord.h b/qpid/cpp/src/qpid/broker/DtxWorkRecord.h index 4a34c079dd..db2cb28f6f 100644 --- a/qpid/cpp/src/qpid/broker/DtxWorkRecord.h +++ b/qpid/cpp/src/qpid/broker/DtxWorkRecord.h @@ -23,7 +23,6 @@ #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/DtxBuffer.h" -#include "qpid/broker/DtxTimeout.h" #include "qpid/broker/TransactionalStore.h" #include "qpid/framing/amqp_types.h" @@ -38,6 +37,8 @@ namespace qpid { namespace broker { +class DtxTimeout; + /** * Represents the work done under a particular distributed transaction * across potentially multiple channels. Identified by a xid. Allows @@ -71,8 +72,8 @@ public: QPID_BROKER_EXTERN void add(DtxBuffer::shared_ptr ops); void recover(std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops); void timedout(); - void setTimeout(boost::intrusive_ptr<DtxTimeout> t) { timeout = t; } - boost::intrusive_ptr<DtxTimeout> getTimeout() { return timeout; } + void setTimeout(boost::intrusive_ptr<DtxTimeout> t); + boost::intrusive_ptr<DtxTimeout> getTimeout(); std::string getXid() const { return xid; } bool isCompleted() const { return completed; } bool isRolledback() const { return rolledback; } diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 5d75583de1..9a0e4a96f4 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -45,6 +45,7 @@ #include "qpid/framing/FieldValue.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Time.h" +#include "qpid/sys/Timer.h" #include "qpid/types/Variant.h" #include "qmf/org/apache/qpid/broker/ArgsQueuePurge.h" #include "qmf/org/apache/qpid/broker/ArgsQueueReroute.h" diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 602ed6931b..ef4d956826 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -38,7 +38,6 @@ #include "qpid/framing/SequenceNumber.h" #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Monitor.h" -#include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Queue.h" #include "qmf/org/apache/qpid/broker/Broker.h" @@ -56,6 +55,9 @@ #include <algorithm> namespace qpid { +namespace sys { +class TimerTask; +} namespace broker { class Broker; class Exchange; diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp index 838bc28be8..8d9e3f43dd 100644 --- a/qpid/cpp/src/qpid/broker/QueueCleaner.cpp +++ b/qpid/cpp/src/qpid/broker/QueueCleaner.cpp @@ -18,15 +18,36 @@ * under the License. * */ -#include "qpid/broker/Queue.h" #include "qpid/broker/QueueCleaner.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Queue.h" +#include "qpid/sys/Timer.h" + +#include <boost/function.hpp> #include <boost/bind.hpp> namespace qpid { namespace broker { +namespace { + typedef boost::function0<void> FireFunction; + class Task : public sys::TimerTask + { + public: + Task(FireFunction f, sys::Duration duration); + void fire(); + private: + FireFunction fireFunction; + }; + + Task::Task(FireFunction f, qpid::sys::Duration d) : sys::TimerTask(d,"QueueCleaner"), fireFunction(f) {} + + void Task::fire() + { + fireFunction(); + } +} QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer* t) : queues(q), timer(t) {} QueueCleaner::~QueueCleaner() @@ -37,7 +58,7 @@ QueueCleaner::~QueueCleaner() void QueueCleaner::start(qpid::sys::Duration p) { period = p; - task = new Task(*this, p); + task = new Task(boost::bind(&QueueCleaner::fired, this), p); timer->add(task); } @@ -45,14 +66,6 @@ void QueueCleaner::setTimer(qpid::sys::Timer* timer) { this->timer = timer; } - -QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d,"QueueCleaner"), parent(p) {} - -void QueueCleaner::Task::fire() -{ - parent.fired(); -} - namespace { struct CollectQueues { diff --git a/qpid/cpp/src/qpid/broker/QueueCleaner.h b/qpid/cpp/src/qpid/broker/QueueCleaner.h index ffebfe3e1b..896af1dcd5 100644 --- a/qpid/cpp/src/qpid/broker/QueueCleaner.h +++ b/qpid/cpp/src/qpid/broker/QueueCleaner.h @@ -23,9 +23,17 @@ */ #include "qpid/broker/BrokerImportExport.h" -#include "qpid/sys/Timer.h" +#include "qpid/sys/Time.h" + +#include <boost/intrusive_ptr.hpp> namespace qpid { + +namespace sys { + class Timer; + class TimerTask; +} + namespace broker { class QueueRegistry; @@ -39,16 +47,8 @@ class QueueCleaner QPID_BROKER_EXTERN ~QueueCleaner(); QPID_BROKER_EXTERN void start(sys::Duration period); QPID_BROKER_EXTERN void setTimer(sys::Timer* timer); - private: - class Task : public sys::TimerTask - { - public: - Task(QueueCleaner& parent, sys::Duration duration); - void fire(); - private: - QueueCleaner& parent; - }; + private: boost::intrusive_ptr<sys::TimerTask> task; QueueRegistry& queues; sys::Timer* timer; diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index b679aebbfa..a065e18a76 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -17,6 +17,7 @@ */ #include "qpid/broker/SessionAdapter.h" #include "qpid/broker/Connection.h" +#include "qpid/broker/DtxTimeout.h" #include "qpid/broker/Queue.h" #include "qpid/Exception.h" #include "qpid/framing/reply_exceptions.h" diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index d7c24c6187..39954bb3ee 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -41,6 +41,7 @@ #include <boost/scoped_ptr.hpp> #include <boost/intrusive_ptr.hpp> +#include <queue> #include <set> #include <vector> #include <ostream> diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 991dac9139..9be07d1ca2 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -33,6 +33,7 @@ #include "qpid/framing/FieldValue.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/sys/Time.h" +#include "qpid/sys/Timer.h" #include "qpid/sys/Thread.h" #include "qpid/sys/PollableQueue.h" #include "qpid/broker/ConnectionState.h" @@ -47,6 +48,9 @@ #include <sstream> #include <typeinfo> +#include <boost/bind.hpp> +#include <boost/function.hpp> + namespace qpid { namespace management { @@ -92,6 +96,32 @@ struct ScopedManagementContext setManagementExecutionContext(0); } }; + +typedef boost::function0<void> FireFunction; +struct Periodic : public qpid::sys::TimerTask +{ + FireFunction fireFunction; + qpid::sys::Timer* timer; + + Periodic (FireFunction f, qpid::sys::Timer* t, uint32_t seconds); + virtual ~Periodic (); + void fire (); +}; + +Periodic::Periodic (FireFunction f, qpid::sys::Timer* t, uint32_t _seconds) + : TimerTask(sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC), + "ManagementAgent::periodicProcessing"), + fireFunction(f), timer(t) {} + +Periodic::~Periodic() {} + +void Periodic::fire() +{ + setupNextFire(); + timer->add(this); + fireFunction(); +} + } @@ -171,7 +201,7 @@ void ManagementAgent::configure(const string& _dataDir, bool _publish, uint16_t new EventQueue(boost::bind(&ManagementAgent::sendEvents, this, _1), broker->getPoller())); sendQueue->start(); timer = &broker->getTimer(); - timer->add(new Periodic(*this, interval)); + timer->add(new Periodic(boost::bind(&ManagementAgent::periodicProcessing, this), timer, interval)); // Get from file or generate and save to file. if (dataDir.empty()) @@ -415,20 +445,6 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi } } -ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) - : TimerTask(sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC), - "ManagementAgent::periodicProcessing"), - agent(_agent) {} - -ManagementAgent::Periodic::~Periodic() {} - -void ManagementAgent::Periodic::fire() -{ - setupNextFire(); - agent.timer->add(this); - agent.periodicProcessing(); -} - void ManagementAgent::clientAdded (const string& routingKey) { sys::Mutex::ScopedLock lock(userLock); diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index 48ed2f7566..0953493388 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -26,7 +26,6 @@ #include "qpid/broker/Exchange.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/Mutex.h" -#include "qpid/sys/Timer.h" #include "qpid/broker/ConnectionToken.h" #include "qpid/management/ManagementObject.h" #include "qpid/management/ManagementEvent.h" @@ -47,6 +46,9 @@ namespace qpid { namespace broker { class ConnectionState; } +namespace sys { +class Timer; +} namespace management { class ManagementAgent @@ -176,15 +178,6 @@ public: void importDeletedObjects( const DeletedObjectList& inList ); private: - struct Periodic : public qpid::sys::TimerTask - { - ManagementAgent& agent; - - Periodic (ManagementAgent& agent, uint32_t seconds); - virtual ~Periodic (); - void fire (); - }; - // Storage for tracking remote management agents, attached via the client // management agent API. // diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index 3dfe3863f4..1177bf7119 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -40,6 +40,7 @@ #include "qpid/framing/reply_exceptions.h" #include "qpid/broker/QueueFlowLimit.h" #include "qpid/broker/QueueSettings.h" +#include "qpid/sys/Timer.h" #include <iostream> #include <vector> |
