From 16a6c8873629972a7324509aac13bb950419176f Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 1 Oct 2009 18:09:10 +0000 Subject: QPID-942 : Add Simplistic Producer Flow Control to the java Broker / java 0-8/0-9 client git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@820739 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 40 ++++++++++++++++++---- .../client/handler/ClientMethodDispatcherImpl.java | 4 ++- 2 files changed, 37 insertions(+), 7 deletions(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 948b21f8cf..fa15df34ec 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -115,6 +115,7 @@ public abstract class AMQSession { private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16]; @@ -199,16 +200,32 @@ public abstract class AMQSession= System.currentTimeMillis() ) + { + + _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD); + _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control"); + } + if(!_flowControl.getFlowControl()) { - _flowControl.wait(); + _logger.error("Message send failed due to timeout waiting on broker enforced flow control"); + throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control"); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java index 9c791730ca..0e3333940a 100644 --- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java +++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java @@ -39,6 +39,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance(); private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance(); private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance(); + private static final ChannelFlowMethodHandler _channelFlowMethodHandler = ChannelFlowMethodHandler.getInstance(); private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance(); private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance(); private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance(); @@ -159,7 +160,8 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException { - return false; + _channelFlowMethodHandler.methodReceived(_session, body, channelId); + return true; } public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException -- cgit v1.2.1