summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2009-01-27 21:17:47 +0000
committerAndrew Stitcher <astitcher@apache.org>2009-01-27 21:17:47 +0000
commit2dff9493ceb62d37a3b70a4abd6bc0539bdb581e (patch)
tree0003c4766da8f32e8fc2b9f5b4968391b7319492 /cpp/src/qpid/broker
parent3f547381f1af5cdb9d7c5f9cc30f7303d643afd9 (diff)
downloadqpid-python-2dff9493ceb62d37a3b70a4abd6bc0539bdb581e.tar.gz
Producer side rate throttling:
This uses the Message.Flow command to send credit from broker to client to ensure that the client doesn't exceed a rate configured on the broker per session. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@738247 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp6
-rw-r--r--cpp/src/qpid/broker/Broker.h1
-rw-r--r--cpp/src/qpid/broker/RateFlowcontrol.h96
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp110
-rw-r--r--cpp/src/qpid/broker/SessionState.h8
5 files changed, 201 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index e4c5c9b5e9..f692ff72f3 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -90,7 +90,8 @@ Broker::Options::Options(const std::string& name) :
replayHardLimit(0),
queueLimit(100*1048576/*100M default limit*/),
tcpNoDelay(false),
- requireEncrypted(false)
+ requireEncrypted(false),
+ maxSessionRate(0)
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -119,7 +120,8 @@ Broker::Options::Options(const std::string& name) :
("default-queue-limit", optValue(queueLimit, "BYTES"), "Default maximum size for queues (in bytes)")
("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections")
("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted")
- ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)");
+ ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)")
+ ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)");
}
const std::string empty;
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index d97737c707..c50ef46baa 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -105,6 +105,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
bool tcpNoDelay;
bool requireEncrypted;
std::string knownHosts;
+ uint32_t maxSessionRate;
};
private:
diff --git a/cpp/src/qpid/broker/RateFlowcontrol.h b/cpp/src/qpid/broker/RateFlowcontrol.h
new file mode 100644
index 0000000000..3323097eff
--- /dev/null
+++ b/cpp/src/qpid/broker/RateFlowcontrol.h
@@ -0,0 +1,96 @@
+#ifndef broker_RateFlowcontrol_h
+#define broker_RateFlowcontrol_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Time.h"
+#include "qpid/sys/IntegerTypes.h"
+
+#include <algorithm>
+
+namespace qpid {
+namespace broker {
+
+// Class to keep track of issuing flow control to make sure that the peer doesn't exceed
+// a given message rate
+//
+// Create the object with the target rate
+// Then call sendCredit() whenever credit is issued to the peer
+// Call receivedMessage() whenever a message is received, it returns the credit to issue.
+//
+// sentCredit() be sensibly called with a 0 parameter to indicate
+// that we sent credit but treat it as if the value was 0 (we may do this at the start of the connection
+// to allow our peer to send messages)
+//
+// receivedMessage() can be called with 0 to indicate that we've not received a message, but
+// tell me what credit I can send.
+class RateFlowcontrol {
+ uint32_t rate; // messages per second
+ uint32_t maxCredit; // max credit issued to client (issued at start)
+ uint32_t requestedCredit;
+ qpid::sys::AbsTime creditSent;
+
+public:
+ RateFlowcontrol(uint32_t r) :
+ rate(r),
+ maxCredit(0),
+ requestedCredit(0),
+ creditSent(qpid::sys::FAR_FUTURE)
+ {}
+
+ uint32_t getRate() const {
+ return rate;
+ }
+ void sentCredit(const qpid::sys::AbsTime& t, uint32_t credit);
+ uint32_t receivedMessage(const qpid::sys::AbsTime& t, uint32_t msgs);
+ bool flowStopped() const;
+};
+
+inline void RateFlowcontrol::sentCredit(const qpid::sys::AbsTime& t, uint32_t credit) {
+ // If the client isn't currently requesting credit (ie it's not sent us anything yet) then
+ // this credit goes to the max credit held by the client (it can't go to reduce credit
+ // less than 0)
+ int32_t nextRequestedCredit = requestedCredit - credit;
+ if ( nextRequestedCredit<0 ) {
+ requestedCredit = 0;
+ maxCredit -= nextRequestedCredit;
+ } else {
+ requestedCredit = nextRequestedCredit;
+ }
+ creditSent = t;
+}
+
+inline uint32_t RateFlowcontrol::receivedMessage(const qpid::sys::AbsTime& t, uint32_t msgs) {
+ requestedCredit +=msgs;
+ qpid::sys::Duration d(creditSent, t);
+ // Could be -ve before first sentCredit
+ int64_t toSend = std::min(rate * d / qpid::sys::TIME_SEC, static_cast<int64_t>(requestedCredit));
+ return toSend > 0 ? toSend : 0;
+}
+
+inline bool RateFlowcontrol::flowStopped() const {
+ return requestedCredit >= maxCredit;
+}
+
+}}
+
+#endif // broker_RateFlowcontrol_h
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index 0a24a39d38..5039b31874 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,6 +24,8 @@
#include "DeliveryRecord.h"
#include "SessionManager.h"
#include "SessionHandler.h"
+#include "RateFlowcontrol.h"
+#include "Timer.h"
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
#include "qpid/framing/AMQMethodBody.h"
@@ -31,6 +33,7 @@
#include "qpid/framing/ServerInvoker.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementBroker.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
@@ -46,17 +49,19 @@ using qpid::management::ManagementBroker;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
+using qpid::sys::AbsTime;
namespace _qmf = qmf::org::apache::qpid::broker;
SessionState::SessionState(
- Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config)
+ Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config)
: qpid::SessionState(id, config),
broker(b), handler(&h),
semanticState(*this, *this),
adapter(semanticState),
msgBuilder(&broker.getStore(), broker.getStagingThreshold()),
enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)),
- mgmtObject(0)
+ mgmtObject(0),
+ rateFlowcontrol(0)
{
Manageable* parent = broker.GetVhostObject ();
if (parent != 0) {
@@ -71,12 +76,19 @@ SessionState::SessionState(
agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0);
}
}
+ uint32_t maxRate = broker.getOptions().maxSessionRate;
+ if (maxRate) {
+ rateFlowcontrol = new RateFlowcontrol(maxRate);
+ }
attach(h);
}
SessionState::~SessionState() {
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
+
+ if (flowControlTimer)
+ flowControlTimer->cancel();
}
AMQP_ClientProxy& SessionState::getProxy() {
@@ -102,7 +114,7 @@ void SessionState::detach() {
mgmtObject->set_attached (0);
}
-void SessionState::disableOutput()
+void SessionState::disableOutput()
{
semanticState.detached();//prevents further activateOutput calls until reattached
getConnection().outputTasks.removeOutputTask(&semanticState);
@@ -120,12 +132,12 @@ void SessionState::attach(SessionHandler& h) {
}
void SessionState::activateOutput() {
- if (isAttached())
+ if (isAttached())
getConnection().outputTasks.activateOutput();
}
void SessionState::giveReadCredit(int32_t credit) {
- if (isAttached())
+ if (isAttached())
getConnection().outputTasks.giveReadCredit(credit);
}
@@ -170,18 +182,49 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId,
void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) {
Invoker::Result invocation = invoke(adapter, *method);
- receiverCompleted(id);
+ receiverCompleted(id);
if (!invocation.wasHandled()) {
throw NotImplementedException(QPID_MSG("Not implemented: " << *method));
} else if (invocation.hasResult()) {
getProxy().getExecution().result(id, invocation.getResult());
}
- if (method->isSync()) {
+ if (method->isSync()) {
incomplete.process(enqueuedOp, true);
sendAcceptAndCompletion();
}
}
+struct ScheduledCreditTask : public TimerTask {
+ Timer& timer;
+ SessionState& sessionState;
+ RateFlowcontrol& flowControl;
+ ScheduledCreditTask(const qpid::sys::Duration& d, Timer& t,
+ SessionState& s, RateFlowcontrol& f) :
+ TimerTask(d),
+ timer(t),
+ sessionState(s),
+ flowControl(f)
+ {}
+
+ void fire() {
+ // This is the best we can currently do to avoid a destruction/fire race
+ if (!isCancelled()) {
+ // Send credit
+ AbsTime now = AbsTime::now();
+ uint32_t sendCredit = flowControl.receivedMessage(now, 0);
+ if ( sendCredit>0 ) {
+ QPID_LOG(debug, sessionState.getId() << ": send producer credit " << sendCredit);
+ sessionState.getProxy().getMessage().flow("", 0, sendCredit);
+ flowControl.sentCredit(now, sendCredit);
+ } else if ( flowControl.flowStopped() ) {
+ QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit");
+ reset();
+ timer.add(this);
+ }
+ }
+ }
+};
+
void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
{
if (frame.getBof() && frame.getBos()) //start of frameset
@@ -194,10 +237,10 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
AMQFrame header((AMQHeaderBody()));
header.setBof(false);
header.setEof(false);
- msg->getFrames().append(header);
+ msg->getFrames().append(header);
}
msg->setPublisher(&getConnection());
- semanticState.handle(msg);
+ semanticState.handle(msg);
msgBuilder.end();
if (msg->isEnqueueComplete()) {
@@ -206,14 +249,39 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
incomplete.add(msg);
}
- //hold up execution until async enqueue is complete
- if (msg->getFrames().getMethod()->isSync()) {
+ //hold up execution until async enqueue is complete
+ if (msg->getFrames().getMethod()->isSync()) {
incomplete.process(enqueuedOp, true);
sendAcceptAndCompletion();
} else {
incomplete.process(enqueuedOp, false);
}
}
+
+ // Handle producer session flow control
+ if (rateFlowcontrol && frame.getBof() && frame.getBos()) {
+ // Check for violating flow control
+ if ( rateFlowcontrol->flowStopped() ) {
+ QPID_LOG(warning, getId() << ": producer throttling violation");
+ // TODO: Probably do message.stop("") first time then disconnect
+ getProxy().getMessage().stop("");
+ } else {
+ AbsTime now = AbsTime::now();
+ uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, 1);
+ if ( sendCredit>0 ) {
+ QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
+ getProxy().getMessage().flow("", 0, sendCredit);
+ rateFlowcontrol->sentCredit(now, sendCredit);
+ } else if ( rateFlowcontrol->flowStopped() ) {
+ QPID_LOG(debug, getId() << ": Schedule sending credit");
+ Timer& timer = getBroker().getTimer();
+ // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms
+ sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC);
+ flowControlTimer = new ScheduledCreditTask(d, timer, *this, *rateFlowcontrol);
+ timer.add(flowControlTimer);
+ }
+ }
+ }
}
void SessionState::sendAcceptAndCompletion()
@@ -222,7 +290,7 @@ void SessionState::sendAcceptAndCompletion()
getProxy().getMessage().accept(accepted);
accepted.clear();
}
- sendCompletion();
+ sendCompletion();
}
void SessionState::enqueued(boost::intrusive_ptr<Message> msg)
@@ -240,7 +308,7 @@ void SessionState::handleIn(AMQFrame& frame) {
if (m == 0 || m->isContentBearing()) {
handleContent(frame, commandId);
} else if (frame.getBof() && frame.getEof()) {
- handleCommand(frame.getMethod(), commandId);
+ handleCommand(frame.getMethod(), commandId);
} else {
throw InternalErrorException("Cannot handle multi-frame command segments yet");
}
@@ -265,8 +333,8 @@ void SessionState::deliver(DeliveryRecord& msg, bool sync)
}
}
-void SessionState::sendCompletion() {
- handler->sendCompletion();
+void SessionState::sendCompletion() {
+ handler->sendCompletion();
}
void SessionState::senderCompleted(const SequenceSet& commands) {
@@ -282,6 +350,14 @@ void SessionState::readyToSend() {
sys::AggregateOutput& tasks = handler->getConnection().outputTasks;
tasks.addOutputTask(&semanticState);
tasks.activateOutput();
+
+ if (rateFlowcontrol) {
+ // Issue initial credit - use a heuristic here issue min of 100 messages or 1 secs worth
+ QPID_LOG(debug, getId() << ": Issuing producer message credit " << std::min(rateFlowcontrol->getRate(), 100U));
+ getProxy().getMessage().setFlowMode("", 0);
+ getProxy().getMessage().flow("", 0, std::min(rateFlowcontrol->getRate(), 100U));
+ rateFlowcontrol->sentCredit(AbsTime::now(), std::min(rateFlowcontrol->getRate(), 100U));
+ }
}
Broker& SessionState::getBroker() { return broker; }
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index f5f1bde2a2..29ca2665ea 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -55,6 +55,8 @@ class ConnectionState;
class Message;
class SessionHandler;
class SessionManager;
+class RateFlowcontrol;
+class TimerTask;
/**
* Broker-side session state includes session's handler chains, which
@@ -132,7 +134,11 @@ class SessionState : public qpid::SessionState,
qmf::org::apache::qpid::broker::Session* mgmtObject;
qpid::framing::SequenceSet accepted;
- friend class SessionManager;
+ // State used for producer flow control (rate limited)
+ RateFlowcontrol* rateFlowcontrol;
+ boost::intrusive_ptr<TimerTask> flowControlTimer;
+
+ friend class SessionManager;
};