summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/SessionState.cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2009-01-27 21:17:47 +0000
committerAndrew Stitcher <astitcher@apache.org>2009-01-27 21:17:47 +0000
commit2dff9493ceb62d37a3b70a4abd6bc0539bdb581e (patch)
tree0003c4766da8f32e8fc2b9f5b4968391b7319492 /cpp/src/qpid/broker/SessionState.cpp
parent3f547381f1af5cdb9d7c5f9cc30f7303d643afd9 (diff)
downloadqpid-python-2dff9493ceb62d37a3b70a4abd6bc0539bdb581e.tar.gz
Producer side rate throttling:
This uses the Message.Flow command to send credit from broker to client to ensure that the client doesn't exceed a rate configured on the broker per session. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@738247 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/SessionState.cpp')
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp110
1 files changed, 93 insertions, 17 deletions
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 0a24a39d38..5039b31874 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,8 @@
#include "DeliveryRecord.h"
#include "SessionManager.h"
#include "SessionHandler.h"
+#include "RateFlowcontrol.h"
+#include "Timer.h"
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/framing/AMQMethodBody.h"
@@ -31,6 +33,7 @@
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementBroker.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
@@ -46,17 +49,19 @@ using qpid::management::ManagementBroker;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
+using qpid::sys::AbsTime;
namespace _qmf = qmf::org::apache::qpid::broker;
SessionState::SessionState(
- Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config)
+ Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config)
: qpid::SessionState(id, config),
broker(b), handler(&h),
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)),
- mgmtObject(0)
+ mgmtObject(0),
+ rateFlowcontrol(0)
{
Manageable* parent = broker.GetVhostObject ();
if (parent != 0) {
@@ -71,12 +76,19 @@ SessionState::SessionState(
agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0);
}
}
+ uint32_t maxRate = broker.getOptions().maxSessionRate;
+ if (maxRate) {
+ rateFlowcontrol = new RateFlowcontrol(maxRate);
+ }
attach(h);
}
SessionState::~SessionState() {
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
+
+ if (flowControlTimer)
+ flowControlTimer->cancel();
}
AMQP_ClientProxy& SessionState::getProxy() {
@@ -102,7 +114,7 @@ void SessionState::detach() {
mgmtObject->set_attached (0);
}
-void SessionState::disableOutput()
+void SessionState::disableOutput()
{
semanticState.detached();//prevents further activateOutput calls until reattached
getConnection().outputTasks.removeOutputTask(&semanticState);
@@ -120,12 +132,12 @@ void SessionState::attach(SessionHandler& h) {
}
void SessionState::activateOutput() {
- if (isAttached())
+ if (isAttached())
getConnection().outputTasks.activateOutput();
}
void SessionState::giveReadCredit(int32_t credit) {
- if (isAttached())
+ if (isAttached())
getConnection().outputTasks.giveReadCredit(credit);
}
@@ -170,18 +182,49 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
Invoker::Result invocation = invoke(adapter, *method);
- receiverCompleted(id);
+ receiverCompleted(id);
if (!invocation.wasHandled()) {
throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
} else if (invocation.hasResult()) {
getProxy().getExecution().result(id, invocation.getResult());
}
- if (method->isSync()) {
+ if (method->isSync()) {
incomplete.process(enqueuedOp, true);
sendAcceptAndCompletion();
}
}
+struct ScheduledCreditTask : public TimerTask {
+ Timer& timer;
+ SessionState& sessionState;
+ RateFlowcontrol& flowControl;
+ ScheduledCreditTask(const qpid::sys::Duration& d, Timer& t,
+ SessionState& s, RateFlowcontrol& f) :
+ TimerTask(d),
+ timer(t),
+ sessionState(s),
+ flowControl(f)
+ {}
+
+ void fire() {
+ // This is the best we can currently do to avoid a destruction/fire race
+ if (!isCancelled()) {
+ // Send credit
+ AbsTime now = AbsTime::now();
+ uint32_t sendCredit = flowControl.receivedMessage(now, 0);
+ if ( sendCredit>0 ) {
+ QPID_LOG(debug, sessionState.getId() << ": send producer credit " << sendCredit);
+ sessionState.getProxy().getMessage().flow("", 0, sendCredit);
+ flowControl.sentCredit(now, sendCredit);
+ } else if ( flowControl.flowStopped() ) {
+ QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
+ reset();
+ timer.add(this);
+ }
+ }
+ }
+};
+
void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
{
if (frame.getBof() && frame.getBos()) //start of frameset
@@ -194,10 +237,10 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
AMQFrame header((AMQHeaderBody()));
header.setBof(false);
header.setEof(false);
- msg->getFrames().append(header);
+ msg->getFrames().append(header);
}
msg->setPublisher(&getConnection());
- semanticState.handle(msg);
+ semanticState.handle(msg);
msgBuilder.end();
if (msg->isEnqueueComplete()) {
@@ -206,14 +249,39 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
incomplete.add(msg);
}
- //hold up execution until async enqueue is complete
- if (msg->getFrames().getMethod()->isSync()) {
+ //hold up execution until async enqueue is complete
+ if (msg->getFrames().getMethod()->isSync()) {
incomplete.process(enqueuedOp, true);
sendAcceptAndCompletion();
} else {
incomplete.process(enqueuedOp, false);
}
}
+
+ // Handle producer session flow control
+ if (rateFlowcontrol && frame.getBof() && frame.getBos()) {
+ // Check for violating flow control
+ if ( rateFlowcontrol->flowStopped() ) {
+ QPID_LOG(warning, getId() << ": producer throttling violation");
+ // TODO: Probably do message.stop("") first time then disconnect
+ getProxy().getMessage().stop("");
+ } else {
+ AbsTime now = AbsTime::now();
+ uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, 1);
+ if ( sendCredit>0 ) {
+ QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
+ getProxy().getMessage().flow("", 0, sendCredit);
+ rateFlowcontrol->sentCredit(now, sendCredit);
+ } else if ( rateFlowcontrol->flowStopped() ) {
+ QPID_LOG(debug, getId() << ": Schedule sending credit");
+ Timer& timer = getBroker().getTimer();
+ // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms
+ sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC);
+ flowControlTimer = new ScheduledCreditTask(d, timer, *this, *rateFlowcontrol);
+ timer.add(flowControlTimer);
+ }
+ }
+ }
}
void SessionState::sendAcceptAndCompletion()
@@ -222,7 +290,7 @@ void SessionState::sendAcceptAndCompletion()
getProxy().getMessage().accept(accepted);
accepted.clear();
}
- sendCompletion();
+ sendCompletion();
}
void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
@@ -240,7 +308,7 @@ void SessionState::handleIn(AMQFrame& frame) {
if (m == 0 || m->isContentBearing()) {
handleContent(frame, commandId);
} else if (frame.getBof() && frame.getEof()) {
- handleCommand(frame.getMethod(), commandId);
+ handleCommand(frame.getMethod(), commandId);
} else {
throw InternalErrorException("Cannot handle multi-frame command segments yet");
}
@@ -265,8 +333,8 @@ void SessionState::deliver(DeliveryRecord& msg, bool sync)
}
}
-void SessionState::sendCompletion() {
- handler->sendCompletion();
+void SessionState::sendCompletion() {
+ handler->sendCompletion();
}
void SessionState::senderCompleted(const SequenceSet& commands) {
@@ -282,6 +350,14 @@ void SessionState::readyToSend() {
sys::AggregateOutput& tasks = handler->getConnection().outputTasks;
tasks.addOutputTask(&semanticState);
tasks.activateOutput();
+
+ if (rateFlowcontrol) {
+ // Issue initial credit - use a heuristic here issue min of 100 messages or 1 secs worth
+ QPID_LOG(debug, getId() << ": Issuing producer message credit " << std::min(rateFlowcontrol->getRate(), 100U));
+ getProxy().getMessage().setFlowMode("", 0);
+ getProxy().getMessage().flow("", 0, std::min(rateFlowcontrol->getRate(), 100U));
+ rateFlowcontrol->sentCredit(AbsTime::now(), std::min(rateFlowcontrol->getRate(), 100U));
+ }
}
Broker& SessionState::getBroker() { return broker; }