summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp17
1 files changed, 9 insertions, 8 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 2b8048ea3d..b465a65bd3 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -25,7 +25,7 @@
#include "SessionManager.h"
#include "SessionHandler.h"
#include "RateFlowcontrol.h"
-#include "qpid/sys/Timer.h"
+#include "Timer.h"
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/framing/AMQMethodBody.h"
@@ -49,7 +49,6 @@ using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
using qpid::sys::AbsTime;
-//using qpid::sys::Timer;
namespace _qmf = qmf::org::apache::qpid::broker;
SessionState::SessionState(
@@ -207,10 +206,10 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN
}
}
-struct ScheduledCreditTask : public sys::TimerTask {
- sys::Timer& timer;
+struct ScheduledCreditTask : public TimerTask {
+ Timer& timer;
SessionState& sessionState;
- ScheduledCreditTask(const qpid::sys::Duration& d, sys::Timer& t,
+ ScheduledCreditTask(const qpid::sys::Duration& d, Timer& t,
SessionState& s) :
TimerTask(d),
timer(t),
@@ -219,13 +218,15 @@ struct ScheduledCreditTask : public sys::TimerTask {
void fire() {
// This is the best we can currently do to avoid a destruction/fire race
- sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this));
+ if (!isCancelled()) {
+ sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit, this));
+ }
}
void sendCredit() {
if ( !sessionState.processSendCredit(0) ) {
QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
- setupNextFire();
+ reset();
timer.add(this);
}
}
@@ -268,7 +269,7 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
if (rateFlowcontrol && frame.getBof() && frame.getBos()) {
if ( !processSendCredit(1) ) {
QPID_LOG(debug, getId() << ": Schedule sending credit");
- sys::Timer& timer = getBroker().getTimer();
+ Timer& timer = getBroker().getTimer();
// Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms
sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC);
flowControlTimer = new ScheduledCreditTask(d, timer, *this);