diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2009-10-01 18:09:10 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2009-10-01 18:09:10 +0000 |
| commit | 16a6c8873629972a7324509aac13bb950419176f (patch) | |
| tree | eb1b3c5e886b0adbf40005795ab1c9c414e46891 /java/client/src | |
| parent | 054546a9f7dda8e6877550c46eb258c623e67de6 (diff) | |
| download | qpid-python-16a6c8873629972a7324509aac13bb950419176f.tar.gz | |
QPID-942 : Add Simplistic Producer Flow Control to the java Broker / java 0-8/0-9 client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@820739 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 40 | ||||
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java | 4 |
2 files changed, 37 insertions, 7 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 948b21f8cf..fa15df34ec 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -115,6 +115,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { + public static final class IdToConsumerMap<C extends BasicMessageConsumer> { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; @@ -199,16 +200,32 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic * The default value for immediate flag used by producers created by this session is false. That is, a consumer does * not need to be attached to a queue. */ - protected static final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); + protected final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false")); /** * The default value for mandatory flag used by producers created by this session is true. That is, server will not * silently drop messages where no queue is connected to the exchange for the message. */ - protected static final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); + protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true")); + + protected final boolean DEFAULT_WAIT_ON_SEND = Boolean.parseBoolean(System.getProperty("qpid.default_wait_on_send", "false")); + + /** + * The period to wait while flow controlled before sending a log message confirming that the session is still + * waiting on flow control being revoked + */ + protected final long FLOW_CONTROL_WAIT_PERIOD = Long.getLong("qpid.flow_control_wait_notify_period",5000L); + + /** + * The period to wait while flow controlled before declaring a failure + */ + public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L; + protected final long FLOW_CONTROL_WAIT_FAILURE = Long.getLong("qpid.flow_control_wait_failure", + DEFAULT_FLOW_CONTROL_WAIT_FAILURE); protected final boolean DECLARE_QUEUES = Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true")); + protected final boolean DECLARE_EXCHANGES = Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true")); @@ -2274,7 +2291,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate) throws JMSException { - return createProducerImpl(destination, mandatory, immediate, false); + return createProducerImpl(destination, mandatory, immediate, DEFAULT_WAIT_ON_SEND); } private P createProducerImpl(final Destination destination, final boolean mandatory, @@ -2745,15 +2762,26 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public void setFlowControl(final boolean active) { _flowControl.setFlowControl(active); + _logger.warn("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced")); } - public void checkFlowControl() throws InterruptedException + public void checkFlowControl() throws InterruptedException, JMSException { + long expiryTime = 0L; synchronized (_flowControl) { - while (!_flowControl.getFlowControl()) + while (!_flowControl.getFlowControl() && + (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE) + : expiryTime) >= System.currentTimeMillis() ) + { + + _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD); + _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control"); + } + if(!_flowControl.getFlowControl()) { - _flowControl.wait(); + _logger.error("Message send failed due to timeout waiting on broker enforced flow control"); + throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control"); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index 9c791730ca..0e3333940a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -39,6 +39,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance(); private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance(); private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance(); + private static final ChannelFlowMethodHandler _channelFlowMethodHandler = ChannelFlowMethodHandler.getInstance(); private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance(); private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance(); private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance(); @@ -159,7 +160,8 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException { - return false; + _channelFlowMethodHandler.methodReceived(_session, body, channelId); + return true; } public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException |
