diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2009-07-13 15:00:58 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2009-07-13 15:00:58 +0000 |
| commit | 15daf8342812786490f8a8dabcc5ba3cee8593e6 (patch) | |
| tree | 69ce95f978a7c6b7cfded0f025ffce7b1dd5d220 /cpp/src/qpid/broker | |
| parent | b7ec99208bb38dc0cad3a7fd42b8e652610a192a (diff) | |
| download | qpid-python-15daf8342812786490f8a8dabcc5ba3cee8593e6.tar.gz | |
Reverted checkins 793119, 793120, 793121, 793122 because of problems with heartbeats and the store tests.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@793602 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 16 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 51 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Connection.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DtxManager.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DtxManager.h | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/DtxTimeout.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.cpp | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/LinkRegistry.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/QueueCleaner.cpp | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/QueueCleaner.h | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 7 |
13 files changed, 83 insertions, 69 deletions
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 1e0ac64e01..8f4621bb39 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -36,6 +36,7 @@ #include "QueueEvents.h" #include "Vhost.h" #include "System.h" +#include "Timer.h" #include "ExpiryPolicy.h" #include "qpid/management/Manageable.h" #include "qpid/management/ManagementAgent.h" @@ -48,7 +49,6 @@ #include "qpid/framing/OutputHandler.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/Runnable.h" -#include "qpid/sys/Timer.h" #include "qpid/RefCounted.h" #include "AclModule.h" @@ -112,14 +112,13 @@ public: std::string knownHosts; uint32_t maxSessionRate; }; - + private: typedef std::map<std::string, boost::shared_ptr<sys::ProtocolFactory> > ProtocolFactoryMap; void declareStandardExchange(const std::string& name, const std::string& type); boost::shared_ptr<sys::Poller> poller; - sys::Timer timer; Options config; ProtocolFactoryMap protocolFactories; std::auto_ptr<MessageStore> store; @@ -130,6 +129,7 @@ public: ExchangeRegistry exchanges; LinkRegistry links; boost::shared_ptr<sys::ConnectionCodec::Factory> factory; + Timer timer; DtxManager dtxManager; SessionManager sessionManager; management::ManagementAgent* managementAgent; @@ -145,6 +145,8 @@ public: boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; public: + + virtual ~Broker(); QPID_BROKER_EXTERN Broker(const Options& configuration); @@ -183,7 +185,7 @@ public: void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; } boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; } - + SessionManager& getSessionManager() { return sessionManager; } const std::string& getFederationTag() const { return federationTag; } @@ -192,7 +194,7 @@ public: management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); - + /** Add to the broker's protocolFactorys */ void registerProtocolFactory(const std::string& name, boost::shared_ptr<sys::ProtocolFactory>); @@ -224,7 +226,7 @@ public: boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; } void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; } - sys::Timer& getTimer() { return timer; } + Timer& getTimer() { return timer; } boost::function<std::vector<Url> ()> getKnownBrokers; @@ -237,5 +239,7 @@ public: }; }} + + #endif /*!_Broker_*/ diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index c0e9429ba9..a54bcc6db9 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -49,25 +49,35 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace qpid { namespace broker { -struct ConnectionTimeoutTask : public sys::TimerTask { - sys::Timer& timer; +struct ConnectionTimeoutTask : public TimerTask { + Timer& timer; Connection& connection; + AbsTime expires; - ConnectionTimeoutTask(uint16_t hb, sys::Timer& t, Connection& c) : + ConnectionTimeoutTask(uint16_t hb, Timer& t, Connection& c) : TimerTask(Duration(hb*2*TIME_SEC)), timer(t), - connection(c) + connection(c), + expires(AbsTime::now(), duration) {} - void touch() { - restart(); + void touch() + { + expires = AbsTime(AbsTime::now(), duration); } void fire() { - // If we get here then we've not received any traffic in the timeout period - // Schedule closing the connection for the io thread - QPID_LOG(error, "Connection timed out: closing"); - connection.abort(); + // This is the best we can currently do to avoid a destruction/fire race + if (isCancelled()) return; + if (expires < AbsTime::now()) { + // If we get here then we've not received any traffic in the timeout period + // Schedule closing the connection for the io thread + QPID_LOG(error, "Connection timed out: closing"); + connection.abort(); + } else { + reset(); + timer.add(this); + } } }; @@ -328,22 +338,25 @@ void Connection::setSecureConnection(SecureConnection* s) adapter.setSecureConnection(s); } -struct ConnectionHeartbeatTask : public sys::TimerTask { - sys::Timer& timer; +struct ConnectionHeartbeatTask : public TimerTask { + Timer& timer; Connection& connection; - ConnectionHeartbeatTask(uint16_t hb, sys::Timer& t, Connection& c) : + ConnectionHeartbeatTask(uint16_t hb, Timer& t, Connection& c) : TimerTask(Duration(hb*TIME_SEC)), timer(t), connection(c) {} void fire() { - // Setup next firing - setupNextFire(); - timer.add(this); - - // Send Heartbeat - connection.sendHeartbeat(); + // This is the best we can currently do to avoid a destruction/fire race + if (!isCancelled()) { + // Setup next firing + reset(); + timer.add(this); + + // Send Heartbeat + connection.sendHeartbeat(); + } } }; diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index f3a6cb2b7a..17bc8f0970 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -152,8 +152,8 @@ class Connection : public sys::ConnectionInputHandler, qmf::org::apache::qpid::broker::Connection* mgmtObject; LinkRegistry& links; management::ManagementAgent* agent; - sys::Timer& timer; - boost::intrusive_ptr<sys::TimerTask> heartbeatTimer; + Timer& timer; + boost::intrusive_ptr<TimerTask> heartbeatTimer; boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer; ErrorListener* errorListener; bool shadow; diff --git a/cpp/src/qpid/broker/DtxManager.cpp b/cpp/src/qpid/broker/DtxManager.cpp index a9bdb5e152..11e16ec837 100644 --- a/cpp/src/qpid/broker/DtxManager.cpp +++ b/cpp/src/qpid/broker/DtxManager.cpp @@ -33,7 +33,7 @@ using qpid::ptr_map_ptr; using namespace qpid::broker; using namespace qpid::framing; -DtxManager::DtxManager(sys::Timer& t) : store(0), timer(t) {} +DtxManager::DtxManager(Timer& t) : store(0), timer(t) {} DtxManager::~DtxManager() {} @@ -130,7 +130,8 @@ void DtxManager::setTimeout(const std::string& xid, uint32_t secs) } timeout = intrusive_ptr<DtxTimeout>(new DtxTimeout(secs, *this, xid)); record->setTimeout(timeout); - timer.add(timeout); + timer.add(boost::static_pointer_cast<TimerTask>(timeout)); + } uint32_t DtxManager::getTimeout(const std::string& xid) diff --git a/cpp/src/qpid/broker/DtxManager.h b/cpp/src/qpid/broker/DtxManager.h index bfea5e6daf..a61e8610f0 100644 --- a/cpp/src/qpid/broker/DtxManager.h +++ b/cpp/src/qpid/broker/DtxManager.h @@ -24,9 +24,9 @@ #include <boost/ptr_container/ptr_map.hpp> #include "DtxBuffer.h" #include "DtxWorkRecord.h" +#include "Timer.h" #include "TransactionalStore.h" #include "qpid/framing/amqp_types.h" -#include "qpid/sys/Timer.h" #include "qpid/sys/Mutex.h" namespace qpid { @@ -35,7 +35,7 @@ namespace broker { class DtxManager{ typedef boost::ptr_map<std::string, DtxWorkRecord> WorkMap; - struct DtxCleanup : public sys::TimerTask + struct DtxCleanup : public TimerTask { DtxManager& mgr; const std::string& xid; @@ -47,14 +47,14 @@ class DtxManager{ WorkMap work; TransactionalStore* store; qpid::sys::Mutex lock; - qpid::sys::Timer& timer; + Timer& timer; void remove(const std::string& xid); DtxWorkRecord* getWork(const std::string& xid); DtxWorkRecord* createWork(std::string xid); public: - DtxManager(qpid::sys::Timer&); + DtxManager(Timer&); ~DtxManager(); void start(const std::string& xid, DtxBuffer::shared_ptr work); void join(const std::string& xid, DtxBuffer::shared_ptr work); diff --git a/cpp/src/qpid/broker/DtxTimeout.h b/cpp/src/qpid/broker/DtxTimeout.h index 680a210e4f..6e949eab0d 100644 --- a/cpp/src/qpid/broker/DtxTimeout.h +++ b/cpp/src/qpid/broker/DtxTimeout.h @@ -22,7 +22,7 @@ #define _DtxTimeout_ #include "qpid/Exception.h" -#include "qpid/sys/Timer.h" +#include "Timer.h" namespace qpid { namespace broker { @@ -31,12 +31,12 @@ class DtxManager; struct DtxTimeoutException : public Exception {}; -struct DtxTimeout : public sys::TimerTask +struct DtxTimeout : public TimerTask { const uint32_t timeout; DtxManager& mgr; const std::string xid; - + DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid); void fire(); }; diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index a82e828138..c6e10f0f8c 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -36,12 +36,10 @@ namespace _qmf = qmf::org::apache::qpid::broker; #define LINK_MAINT_INTERVAL 2 -LinkRegistry::LinkRegistry (Broker* _broker) : - broker(_broker), timer(broker->getTimer()), - parent(0), store(0), passive(false), passiveChanged(false), - realm(broker ? broker->getOptions().realm : "") +LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0), passive(false), passiveChanged(false), + realm(broker ? broker->getOptions().realm : "") { - timer.add (new Periodic(*this)); + timer.add (intrusive_ptr<TimerTask> (new Periodic(*this))); } LinkRegistry::Periodic::Periodic (LinkRegistry& _links) : @@ -50,7 +48,7 @@ LinkRegistry::Periodic::Periodic (LinkRegistry& _links) : void LinkRegistry::Periodic::fire () { links.periodicMaintenance (); - links.timer.add (new Periodic(links)); + links.timer.add (intrusive_ptr<TimerTask> (new Periodic(links))); } void LinkRegistry::periodicMaintenance () diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h index 1caffb9232..07fff5b979 100644 --- a/cpp/src/qpid/broker/LinkRegistry.h +++ b/cpp/src/qpid/broker/LinkRegistry.h @@ -25,9 +25,9 @@ #include <map> #include "Bridge.h" #include "MessageStore.h" +#include "Timer.h" #include "qpid/Address.h" #include "qpid/sys/Mutex.h" -#include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" #include <boost/shared_ptr.hpp> @@ -41,7 +41,7 @@ namespace broker { // Declare a timer task to manage the establishment of link connections and the // re-establishment of lost link connections. - struct Periodic : public sys::TimerTask + struct Periodic : public TimerTask { LinkRegistry& links; @@ -62,7 +62,7 @@ namespace broker { qpid::sys::Mutex lock; Broker* broker; - sys::Timer& timer; + Timer timer; management::Manageable* parent; MessageStore* store; bool passive; diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp index f5b0163c0f..62b546b3eb 100644 --- a/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/cpp/src/qpid/broker/NullMessageStore.cpp @@ -49,7 +49,7 @@ public: }; NullMessageStore::NullMessageStore() : nextPersistenceId(1) { - QPID_LOG(info, "No message store configured, persistence is disabled."); + QPID_LOG(info, "No message store configured, persistence is disabled.") } bool NullMessageStore::init(const Options* /*options*/) {return true;} diff --git a/cpp/src/qpid/broker/QueueCleaner.cpp b/cpp/src/qpid/broker/QueueCleaner.cpp index 814eca6751..0774dce2b7 100644 --- a/cpp/src/qpid/broker/QueueCleaner.cpp +++ b/cpp/src/qpid/broker/QueueCleaner.cpp @@ -26,15 +26,15 @@ namespace qpid { namespace broker { -QueueCleaner::QueueCleaner(QueueRegistry& q, sys::Timer& t) : queues(q), timer(t) {} +QueueCleaner::QueueCleaner(QueueRegistry& q, Timer& t) : queues(q), timer(t) {} void QueueCleaner::start(qpid::sys::Duration p) { - task = new Task(*this, p); + task = boost::intrusive_ptr<TimerTask>(new Task(*this, p)); timer.add(task); } -QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : sys::TimerTask(d), parent(p) {} +QueueCleaner::Task::Task(QueueCleaner& p, qpid::sys::Duration d) : TimerTask(d), parent(p) {} void QueueCleaner::Task::fire() { @@ -44,7 +44,7 @@ void QueueCleaner::Task::fire() void QueueCleaner::fired() { queues.eachQueue(boost::bind(&Queue::purgeExpired, _1)); - task->setupNextFire(); + task->reset(); timer.add(task); } diff --git a/cpp/src/qpid/broker/QueueCleaner.h b/cpp/src/qpid/broker/QueueCleaner.h index 0fbb12a5d4..007826f33e 100644 --- a/cpp/src/qpid/broker/QueueCleaner.h +++ b/cpp/src/qpid/broker/QueueCleaner.h @@ -23,7 +23,7 @@ */ #include "BrokerImportExport.h" -#include "qpid/sys/Timer.h" +#include "Timer.h" namespace qpid { namespace broker { @@ -35,10 +35,10 @@ class QueueRegistry; class QueueCleaner { public: - QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, sys::Timer& timer); + QPID_BROKER_EXTERN QueueCleaner(QueueRegistry& queues, Timer& timer); QPID_BROKER_EXTERN void start(qpid::sys::Duration period); private: - class Task : public sys::TimerTask + class Task : public TimerTask { public: Task(QueueCleaner& parent, qpid::sys::Duration duration); @@ -46,10 +46,10 @@ class QueueCleaner private: QueueCleaner& parent; }; - - boost::intrusive_ptr<sys::TimerTask> task; + + boost::intrusive_ptr<TimerTask> task; QueueRegistry& queues; - sys::Timer& timer; + Timer& timer; void fired(); }; diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 2b8048ea3d..b465a65bd3 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -25,7 +25,7 @@ #include "SessionManager.h" #include "SessionHandler.h" #include "RateFlowcontrol.h" -#include "qpid/sys/Timer.h" +#include "Timer.h" #include "qpid/framing/AMQContentBody.h" #include "qpid/framing/AMQHeaderBody.h" #include "qpid/framing/AMQMethodBody.h" @@ -49,7 +49,6 @@ using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; using qpid::sys::AbsTime; -//using qpid::sys::Timer; namespace _qmf = qmf::org::apache::qpid::broker; SessionState::SessionState( @@ -207,10 +206,10 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN } } -struct ScheduledCreditTask : public sys::TimerTask { - sys::Timer& timer; +struct ScheduledCreditTask : public TimerTask { + Timer& timer; SessionState& sessionState; - ScheduledCreditTask(const qpid::sys::Duration& d, sys::Timer& t, + ScheduledCreditTask(const qpid::sys::Duration& d, Timer& t, SessionState& s) : TimerTask(d), timer(t), @@ -219,13 +218,15 @@ struct ScheduledCreditTask : public sys::TimerTask { void fire() { // This is the best we can currently do to avoid a destruction/fire race - sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this)); + if (!isCancelled()) { + sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this)); + } } void sendCredit() { if ( !sessionState.processSendCredit(0) ) { QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit"); - setupNextFire(); + reset(); timer.add(this); } } @@ -268,7 +269,7 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) if (rateFlowcontrol && frame.getBof() && frame.getBos()) { if ( !processSendCredit(1) ) { QPID_LOG(debug, getId() << ": Schedule sending credit"); - sys::Timer& timer = getBroker().getTimer(); + Timer& timer = getBroker().getTimer(); // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC); flowControlTimer = new ScheduledCreditTask(d, timer, *this); diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index 1d000fca5f..f9d35e2aac 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -48,10 +48,6 @@ namespace framing { class AMQP_ClientProxy; } -namespace sys { -struct TimerTask; -} - namespace broker { class Broker; @@ -60,6 +56,7 @@ class Message; class SessionHandler; class SessionManager; class RateFlowcontrol; +struct TimerTask; /** * Broker-side session state includes session's handler chains, which @@ -156,7 +153,7 @@ class SessionState : public qpid::SessionState, // State used for producer flow control (rate limited) qpid::sys::Mutex rateLock; boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol; - boost::intrusive_ptr<sys::TimerTask> flowControlTimer; + boost::intrusive_ptr<TimerTask> flowControlTimer; friend class SessionManager; }; |
