summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-02-24 19:48:54 +0000
committerAlan Conway <aconway@apache.org>2009-02-24 19:48:54 +0000
commit5996f46bccf1c0fa6bda145566d11b01064ef6dd (patch)
tree61cee350c55444ffb2ab02262c50fb2699037e7f /cpp/src/qpid/broker
parent338297ff8c2c65a4226f3bc3fdd4da49269cfc9a (diff)
downloadqpid-python-5996f46bccf1c0fa6bda145566d11b01064ef6dd.tar.gz
Fixed issue with producer flow control in a cluster.
Producer flow control uses a Timer and other clock-based calculations to send flow control commands. These commands are not predictably ordered from the clusters point of view. Added getClusterOrderProxy() to SessionState. In a cluster it returns a proxy that defers sending a command to the client until it is multicast to the cluster. In a stand alone broker it is just the normal proxy. Updated producer flow control to use this proxy. Cluster flow control is turned off in shadow connections. Only the directly connected node does flow control calculations and multicasts the commands to send. All nodes sending of the commands thru SessionState to ensure consistent session state (e.g. command numbering.) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@747528 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/ConnectionState.h21
-rw-r--r--cpp/src/qpid/broker/SessionHandler.cpp3
-rw-r--r--cpp/src/qpid/broker/SessionHandler.h18
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp17
-rw-r--r--cpp/src/qpid/broker/SessionState.h11
5 files changed, 57 insertions, 13 deletions
diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h
index 0e9d211b56..0d7fbc5b3b 100644
--- a/cpp/src/qpid/broker/ConnectionState.h
+++ b/cpp/src/qpid/broker/ConnectionState.h
@@ -48,8 +48,9 @@ class ConnectionState : public ConnectionToken, public management::Manageable
heartbeatmax(120),
stagingThreshold(broker.getStagingThreshold()),
federationLink(true),
- clientSupportsThrottling(false)
- {}
+ clientSupportsThrottling(false),
+ clusterOrderOut(0)
+ {}
virtual ~ConnectionState () {}
@@ -75,7 +76,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
const string& getFederationPeerTag() const { return federationPeerTag; }
std::vector<Url>& getKnownHosts() { return knownHosts; }
- void setClientThrottling() { clientSupportsThrottling = true; }
+ void setClientThrottling(bool set=true) { clientSupportsThrottling = set; }
bool getClientThrottling() const { return clientSupportsThrottling; }
Broker& getBroker() { return broker; }
@@ -86,11 +87,20 @@ class ConnectionState : public ConnectionToken, public management::Manageable
//contained output tasks
sys::AggregateOutput outputTasks;
- sys::ConnectionOutputHandlerPtr& getOutput() { return out; }
+ sys::ConnectionOutputHandler& getOutput() { return out; }
framing::ProtocolVersion getVersion() const { return version; }
-
void setOutputHandler(qpid::sys::ConnectionOutputHandler* o) { out.set(o); }
+ /**
+ * If the broker is part of a cluster, this is a handler provided
+ * by cluster code. It ensures consistent ordering of commands
+ * that are sent based on criteria that are not predictably
+ * ordered cluster-wide, e.g. a timer firing.
+ */
+ framing::FrameHandler* getClusterOrderOutput() { return clusterOrderOut; }
+ void setClusterOrderOutput(framing::FrameHandler& fh) { clusterOrderOut = &fh; }
+
+
protected:
framing::ProtocolVersion version;
uint32_t framemax;
@@ -103,6 +113,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
string federationPeerTag;
std::vector<Url> knownHosts;
bool clientSupportsThrottling;
+ framing::FrameHandler* clusterOrderOut;
};
}}
diff --git a/cpp/src/qpid/broker/SessionHandler.cpp b/cpp/src/qpid/broker/SessionHandler.cpp
index 2c4de478f6..5bdc1e2500 100644
--- a/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/cpp/src/qpid/broker/SessionHandler.cpp
@@ -34,7 +34,8 @@ using namespace qpid::sys;
SessionHandler::SessionHandler(Connection& c, ChannelId ch)
: amqp_0_10::SessionHandler(&c.getOutput(), ch),
connection(c),
- proxy(out)
+ proxy(out),
+ clusterOrderProxy(c.getClusterOrderOutput() ? new SetChannelProxy(ch, c.getClusterOrderOutput()) : 0)
{}
SessionHandler::~SessionHandler() {}
diff --git a/cpp/src/qpid/broker/SessionHandler.h b/cpp/src/qpid/broker/SessionHandler.h
index 7449db1560..698e4f397f 100644
--- a/cpp/src/qpid/broker/SessionHandler.h
+++ b/cpp/src/qpid/broker/SessionHandler.h
@@ -54,6 +54,17 @@ class SessionHandler : public amqp_0_10::SessionHandler {
framing::AMQP_ClientProxy& getProxy() { return proxy; }
const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
+ /**
+ * If commands are sent based on the local time (e.g. in timers), they don't have
+ * a well-defined ordering across cluster nodes.
+ * This proxy is for sending such commands. In a clustered broker it will take steps
+ * to synchronize command order across the cluster. In a stand-alone broker
+ * it is just a synonym for getProxy()
+ */
+ framing::AMQP_ClientProxy& getClusterOrderProxy() {
+ return clusterOrderProxy.get() ? *clusterOrderProxy : proxy;
+ }
+
virtual void handleDetach();
// Overrides
@@ -69,9 +80,16 @@ class SessionHandler : public amqp_0_10::SessionHandler {
virtual void readyToSend();
private:
+ struct SetChannelProxy : public framing::AMQP_ClientProxy { // Proxy that sets the channel.
+ framing::ChannelHandler setChannel;
+ SetChannelProxy(uint16_t ch, framing::FrameHandler* out)
+ : framing::AMQP_ClientProxy(setChannel), setChannel(ch, out) {}
+ };
+
Connection& connection;
framing::AMQP_ClientProxy proxy;
std::auto_ptr<SessionState> session;
+ std::auto_ptr<SetChannelProxy> clusterOrderProxy;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index dffc7cf6af..b64fc20787 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -66,7 +66,7 @@ SessionState::SessionState(
uint32_t maxRate = broker.getOptions().maxSessionRate;
if (maxRate) {
if (handler->getConnection().getClientThrottling()) {
- rateFlowcontrol = new RateFlowcontrol(maxRate);
+ rateFlowcontrol.reset(new RateFlowcontrol(maxRate));
} else {
QPID_LOG(warning, getId() << ": Unable to flow control client - client doesn't support");
}
@@ -210,7 +210,6 @@ struct ScheduledCreditTask : public TimerTask {
{}
void fire() {
- QPID_LOG(critical, "ScheduledCreditTask fired"); // FIXME aconway 2009-02-23: REMOVE
// This is the best we can currently do to avoid a destruction/fire race
if (!isCancelled()) {
if ( !sessionState.processSendCredit(0) ) {
@@ -275,7 +274,8 @@ bool SessionState::processSendCredit(uint32_t msgs)
if ( msgs > 0 && rateFlowcontrol->flowStopped() ) {
QPID_LOG(warning, getId() << ": producer throttling violation");
// TODO: Probably do message.stop("") first time then disconnect
- getProxy().getMessage().stop("");
+ // See comment on getClusterOrderProxy() in .h file
+ getClusterOrderProxy().getMessage().stop("");
return true;
}
AbsTime now = AbsTime::now();
@@ -283,7 +283,7 @@ bool SessionState::processSendCredit(uint32_t msgs)
if (mgmtObject) mgmtObject->dec_clientCredit(msgs);
if ( sendCredit>0 ) {
QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
- getProxy().getMessage().flow("", 0, sendCredit);
+ getClusterOrderProxy().getMessage().flow("", 0, sendCredit);
rateFlowcontrol->sentCredit(now, sendCredit);
if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit);
return true;
@@ -364,8 +364,9 @@ void SessionState::readyToSend() {
// Issue initial credit - use a heuristic here issue min of 300 messages or 1 secs worth
uint32_t credit = std::min(rateFlowcontrol->getRate(), 300U);
QPID_LOG(debug, getId() << ": Issuing producer message credit " << credit);
- getProxy().getMessage().setFlowMode("", 0);
- getProxy().getMessage().flow("", 0, credit);
+ // See comment on getClusterOrderProxy() in .h file
+ getClusterOrderProxy().getMessage().setFlowMode("", 0);
+ getClusterOrderProxy().getMessage().flow("", 0, credit);
rateFlowcontrol->sentCredit(AbsTime::now(), credit);
if (mgmtObject) mgmtObject->inc_clientCredit(credit);
}
@@ -373,4 +374,8 @@ void SessionState::readyToSend() {
Broker& SessionState::getBroker() { return broker; }
+framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() {
+ return handler->getClusterOrderProxy();
+}
+
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h
index c435a741f8..b64461eb86 100644
--- a/cpp/src/qpid/broker/SessionState.h
+++ b/cpp/src/qpid/broker/SessionState.h
@@ -125,6 +125,15 @@ class SessionState : public qpid::SessionState,
void sendAcceptAndCompletion();
+ /**
+ * If commands are sent based on the local time (e.g. in timers), they don't have
+ * a well-defined ordering across cluster nodes.
+ * This proxy is for sending such commands. In a clustered broker it will take steps
+ * to synchronize command order across the cluster. In a stand-alone broker
+ * it is just a synonym for getProxy()
+ */
+ framing::AMQP_ClientProxy& getClusterOrderProxy();
+
Broker& broker;
SessionHandler* handler;
sys::AbsTime expiry; // Used by SessionManager.
@@ -138,7 +147,7 @@ class SessionState : public qpid::SessionState,
// State used for producer flow control (rate limited)
qpid::sys::Mutex rateLock;
- RateFlowcontrol* rateFlowcontrol;
+ boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol;
boost::intrusive_ptr<TimerTask> flowControlTimer;
friend class SessionManager;