diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2009-01-27 21:17:47 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2009-01-27 21:17:47 +0000 |
| commit | 2dff9493ceb62d37a3b70a4abd6bc0539bdb581e (patch) | |
| tree | 0003c4766da8f32e8fc2b9f5b4968391b7319492 /cpp/src/qpid/broker | |
| parent | 3f547381f1af5cdb9d7c5f9cc30f7303d643afd9 (diff) | |
| download | qpid-python-2dff9493ceb62d37a3b70a4abd6bc0539bdb581e.tar.gz | |
Producer side rate throttling:
This uses the Message.Flow command to send credit from
broker to client to ensure that the client doesn't
exceed a rate configured on the broker per session.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@738247 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/RateFlowcontrol.h | 96 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.cpp | 110 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/SessionState.h | 8 |
5 files changed, 201 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index e4c5c9b5e9..f692ff72f3 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -90,7 +90,8 @@ Broker::Options::Options(const std::string& name) : replayHardLimit(0), queueLimit(100*1048576/*100M default limit*/), tcpNoDelay(false), - requireEncrypted(false) + requireEncrypted(false), + maxSessionRate(0) { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -119,7 +120,8 @@ Broker::Options::Options(const std::string& name) : ("default-queue-limit", optValue(queueLimit, "BYTES"), "Default maximum size for queues (in bytes)") ("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections") ("require-encryption", optValue(requireEncrypted), "Only accept connections that are encrypted") - ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)"); + ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)") + ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)"); } const std::string empty; diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index d97737c707..c50ef46baa 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -105,6 +105,7 @@ class Broker : public sys::Runnable, public Plugin::Target, bool tcpNoDelay; bool requireEncrypted; std::string knownHosts; + uint32_t maxSessionRate; }; private: diff --git a/cpp/src/qpid/broker/RateFlowcontrol.h b/cpp/src/qpid/broker/RateFlowcontrol.h new file mode 100644 index 0000000000..3323097eff --- /dev/null +++ b/cpp/src/qpid/broker/RateFlowcontrol.h @@ -0,0 +1,96 @@ +#ifndef broker_RateFlowcontrol_h +#define broker_RateFlowcontrol_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Time.h" +#include "qpid/sys/IntegerTypes.h" + +#include <algorithm> + +namespace qpid { +namespace broker { + +// Class to keep track of issuing flow control to make sure that the peer doesn't exceed +// a given message rate +// +// Create the object with the target rate +// Then call sendCredit() whenever credit is issued to the peer +// Call receivedMessage() whenever a message is received, it returns the credit to issue. +// +// sentCredit() be sensibly called with a 0 parameter to indicate +// that we sent credit but treat it as if the value was 0 (we may do this at the start of the connection +// to allow our peer to send messages) +// +// receivedMessage() can be called with 0 to indicate that we've not received a message, but +// tell me what credit I can send. +class RateFlowcontrol { + uint32_t rate; // messages per second + uint32_t maxCredit; // max credit issued to client (issued at start) + uint32_t requestedCredit; + qpid::sys::AbsTime creditSent; + +public: + RateFlowcontrol(uint32_t r) : + rate(r), + maxCredit(0), + requestedCredit(0), + creditSent(qpid::sys::FAR_FUTURE) + {} + + uint32_t getRate() const { + return rate; + } + void sentCredit(const qpid::sys::AbsTime& t, uint32_t credit); + uint32_t receivedMessage(const qpid::sys::AbsTime& t, uint32_t msgs); + bool flowStopped() const; +}; + +inline void RateFlowcontrol::sentCredit(const qpid::sys::AbsTime& t, uint32_t credit) { + // If the client isn't currently requesting credit (ie it's not sent us anything yet) then + // this credit goes to the max credit held by the client (it can't go to reduce credit + // less than 0) + int32_t nextRequestedCredit = requestedCredit - credit; + if ( nextRequestedCredit<0 ) { + requestedCredit = 0; + maxCredit -= nextRequestedCredit; + } else { + requestedCredit = nextRequestedCredit; + } + creditSent = t; +} + +inline uint32_t RateFlowcontrol::receivedMessage(const qpid::sys::AbsTime& t, uint32_t msgs) { + requestedCredit +=msgs; + qpid::sys::Duration d(creditSent, t); + // Could be -ve before first sentCredit + int64_t toSend = std::min(rate * d / qpid::sys::TIME_SEC, static_cast<int64_t>(requestedCredit)); + return toSend > 0 ? toSend : 0; +} + +inline bool RateFlowcontrol::flowStopped() const { + return requestedCredit >= maxCredit; +} + +}} + +#endif // broker_RateFlowcontrol_h diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 0a24a39d38..5039b31874 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,6 +24,8 @@ #include "DeliveryRecord.h" #include "SessionManager.h" #include "SessionHandler.h" +#include "RateFlowcontrol.h" +#include "Timer.h" #include "qpid/framing/AMQContentBody.h" #include "qpid/framing/AMQHeaderBody.h" #include "qpid/framing/AMQMethodBody.h" @@ -31,6 +33,7 @@ #include "qpid/framing/ServerInvoker.h" #include "qpid/log/Statement.h" #include "qpid/management/ManagementBroker.h" +#include "qpid/framing/AMQP_ClientProxy.h" #include <boost/bind.hpp> #include <boost/lexical_cast.hpp> @@ -46,17 +49,19 @@ using qpid::management::ManagementBroker; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; +using qpid::sys::AbsTime; namespace _qmf = qmf::org::apache::qpid::broker; SessionState::SessionState( - Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config) + Broker& b, SessionHandler& h, const SessionId& id, const SessionState::Configuration& config) : qpid::SessionState(id, config), broker(b), handler(&h), semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore(), broker.getStagingThreshold()), enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)), - mgmtObject(0) + mgmtObject(0), + rateFlowcontrol(0) { Manageable* parent = broker.GetVhostObject (); if (parent != 0) { @@ -71,12 +76,19 @@ SessionState::SessionState( agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0); } } + uint32_t maxRate = broker.getOptions().maxSessionRate; + if (maxRate) { + rateFlowcontrol = new RateFlowcontrol(maxRate); + } attach(h); } SessionState::~SessionState() { if (mgmtObject != 0) mgmtObject->resourceDestroy (); + + if (flowControlTimer) + flowControlTimer->cancel(); } AMQP_ClientProxy& SessionState::getProxy() { @@ -102,7 +114,7 @@ void SessionState::detach() { mgmtObject->set_attached (0); } -void SessionState::disableOutput() +void SessionState::disableOutput() { semanticState.detached();//prevents further activateOutput calls until reattached getConnection().outputTasks.removeOutputTask(&semanticState); @@ -120,12 +132,12 @@ void SessionState::attach(SessionHandler& h) { } void SessionState::activateOutput() { - if (isAttached()) + if (isAttached()) getConnection().outputTasks.activateOutput(); } void SessionState::giveReadCredit(int32_t credit) { - if (isAttached()) + if (isAttached()) getConnection().outputTasks.giveReadCredit(credit); } @@ -170,18 +182,49 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) { Invoker::Result invocation = invoke(adapter, *method); - receiverCompleted(id); + receiverCompleted(id); if (!invocation.wasHandled()) { throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); } else if (invocation.hasResult()) { getProxy().getExecution().result(id, invocation.getResult()); } - if (method->isSync()) { + if (method->isSync()) { incomplete.process(enqueuedOp, true); sendAcceptAndCompletion(); } } +struct ScheduledCreditTask : public TimerTask { + Timer& timer; + SessionState& sessionState; + RateFlowcontrol& flowControl; + ScheduledCreditTask(const qpid::sys::Duration& d, Timer& t, + SessionState& s, RateFlowcontrol& f) : + TimerTask(d), + timer(t), + sessionState(s), + flowControl(f) + {} + + void fire() { + // This is the best we can currently do to avoid a destruction/fire race + if (!isCancelled()) { + // Send credit + AbsTime now = AbsTime::now(); + uint32_t sendCredit = flowControl.receivedMessage(now, 0); + if ( sendCredit>0 ) { + QPID_LOG(debug, sessionState.getId() << ": send producer credit " << sendCredit); + sessionState.getProxy().getMessage().flow("", 0, sendCredit); + flowControl.sentCredit(now, sendCredit); + } else if ( flowControl.flowStopped() ) { + QPID_LOG(warning, sessionState.getId() << ": Reschedule sending credit"); + reset(); + timer.add(this); + } + } + } +}; + void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) { if (frame.getBof() && frame.getBos()) //start of frameset @@ -194,10 +237,10 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) AMQFrame header((AMQHeaderBody())); header.setBof(false); header.setEof(false); - msg->getFrames().append(header); + msg->getFrames().append(header); } msg->setPublisher(&getConnection()); - semanticState.handle(msg); + semanticState.handle(msg); msgBuilder.end(); if (msg->isEnqueueComplete()) { @@ -206,14 +249,39 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) incomplete.add(msg); } - //hold up execution until async enqueue is complete - if (msg->getFrames().getMethod()->isSync()) { + //hold up execution until async enqueue is complete + if (msg->getFrames().getMethod()->isSync()) { incomplete.process(enqueuedOp, true); sendAcceptAndCompletion(); } else { incomplete.process(enqueuedOp, false); } } + + // Handle producer session flow control + if (rateFlowcontrol && frame.getBof() && frame.getBos()) { + // Check for violating flow control + if ( rateFlowcontrol->flowStopped() ) { + QPID_LOG(warning, getId() << ": producer throttling violation"); + // TODO: Probably do message.stop("") first time then disconnect + getProxy().getMessage().stop(""); + } else { + AbsTime now = AbsTime::now(); + uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, 1); + if ( sendCredit>0 ) { + QPID_LOG(debug, getId() << ": send producer credit " << sendCredit); + getProxy().getMessage().flow("", 0, sendCredit); + rateFlowcontrol->sentCredit(now, sendCredit); + } else if ( rateFlowcontrol->flowStopped() ) { + QPID_LOG(debug, getId() << ": Schedule sending credit"); + Timer& timer = getBroker().getTimer(); + // Use heuristic for scheduled credit of time for 50 messages, but not longer than 500ms + sys::Duration d = std::min(sys::TIME_SEC * 50 / rateFlowcontrol->getRate(), 500 * sys::TIME_MSEC); + flowControlTimer = new ScheduledCreditTask(d, timer, *this, *rateFlowcontrol); + timer.add(flowControlTimer); + } + } + } } void SessionState::sendAcceptAndCompletion() @@ -222,7 +290,7 @@ void SessionState::sendAcceptAndCompletion() getProxy().getMessage().accept(accepted); accepted.clear(); } - sendCompletion(); + sendCompletion(); } void SessionState::enqueued(boost::intrusive_ptr<Message> msg) @@ -240,7 +308,7 @@ void SessionState::handleIn(AMQFrame& frame) { if (m == 0 || m->isContentBearing()) { handleContent(frame, commandId); } else if (frame.getBof() && frame.getEof()) { - handleCommand(frame.getMethod(), commandId); + handleCommand(frame.getMethod(), commandId); } else { throw InternalErrorException("Cannot handle multi-frame command segments yet"); } @@ -265,8 +333,8 @@ void SessionState::deliver(DeliveryRecord& msg, bool sync) } } -void SessionState::sendCompletion() { - handler->sendCompletion(); +void SessionState::sendCompletion() { + handler->sendCompletion(); } void SessionState::senderCompleted(const SequenceSet& commands) { @@ -282,6 +350,14 @@ void SessionState::readyToSend() { sys::AggregateOutput& tasks = handler->getConnection().outputTasks; tasks.addOutputTask(&semanticState); tasks.activateOutput(); + + if (rateFlowcontrol) { + // Issue initial credit - use a heuristic here issue min of 100 messages or 1 secs worth + QPID_LOG(debug, getId() << ": Issuing producer message credit " << std::min(rateFlowcontrol->getRate(), 100U)); + getProxy().getMessage().setFlowMode("", 0); + getProxy().getMessage().flow("", 0, std::min(rateFlowcontrol->getRate(), 100U)); + rateFlowcontrol->sentCredit(AbsTime::now(), std::min(rateFlowcontrol->getRate(), 100U)); + } } Broker& SessionState::getBroker() { return broker; } diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index f5f1bde2a2..29ca2665ea 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -55,6 +55,8 @@ class ConnectionState; class Message; class SessionHandler; class SessionManager; +class RateFlowcontrol; +class TimerTask; /** * Broker-side session state includes session's handler chains, which @@ -132,7 +134,11 @@ class SessionState : public qpid::SessionState, qmf::org::apache::qpid::broker::Session* mgmtObject; qpid::framing::SequenceSet accepted; - friend class SessionManager; + // State used for producer flow control (rate limited) + RateFlowcontrol* rateFlowcontrol; + boost::intrusive_ptr<TimerTask> flowControlTimer; + + friend class SessionManager; }; |
