summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-01 18:09:10 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-01 18:09:10 +0000
commit16a6c8873629972a7324509aac13bb950419176f (patch)
treeeb1b3c5e886b0adbf40005795ab1c9c414e46891 /java/client/src
parent054546a9f7dda8e6877550c46eb258c623e67de6 (diff)
downloadqpid-python-16a6c8873629972a7324509aac13bb950419176f.tar.gz
QPID-942 : Add Simplistic Producer Flow Control to the java Broker / java 0-8/0-9 client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@820739 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java40
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java4
2 files changed, 37 insertions, 7 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 948b21f8cf..fa15df34ec 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -115,6 +115,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
+
public static final class IdToConsumerMap<C extends BasicMessageConsumer>
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
@@ -199,16 +200,32 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
* The default value for immediate flag used by producers created by this session is false. That is, a consumer does
* not need to be attached to a queue.
*/
- protected static final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
+ protected final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
/**
* The default value for mandatory flag used by producers created by this session is true. That is, server will not
* silently drop messages where no queue is connected to the exchange for the message.
*/
- protected static final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+ protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+
+ protected final boolean DEFAULT_WAIT_ON_SEND = Boolean.parseBoolean(System.getProperty("qpid.default_wait_on_send", "false"));
+
+ /**
+ * The period to wait while flow controlled before sending a log message confirming that the session is still
+ * waiting on flow control being revoked
+ */
+ protected final long FLOW_CONTROL_WAIT_PERIOD = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
+
+ /**
+ * The period to wait while flow controlled before declaring a failure
+ */
+ public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
+ protected final long FLOW_CONTROL_WAIT_FAILURE = Long.getLong("qpid.flow_control_wait_failure",
+ DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
protected final boolean DECLARE_QUEUES =
Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
+
protected final boolean DECLARE_EXCHANGES =
Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
@@ -2274,7 +2291,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
throws JMSException
{
- return createProducerImpl(destination, mandatory, immediate, false);
+ return createProducerImpl(destination, mandatory, immediate, DEFAULT_WAIT_ON_SEND);
}
private P createProducerImpl(final Destination destination, final boolean mandatory,
@@ -2745,15 +2762,26 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void setFlowControl(final boolean active)
{
_flowControl.setFlowControl(active);
+ _logger.warn("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
}
- public void checkFlowControl() throws InterruptedException
+ public void checkFlowControl() throws InterruptedException, JMSException
{
+ long expiryTime = 0L;
synchronized (_flowControl)
{
- while (!_flowControl.getFlowControl())
+ while (!_flowControl.getFlowControl() &&
+ (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE)
+ : expiryTime) >= System.currentTimeMillis() )
+ {
+
+ _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD);
+ _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control");
+ }
+ if(!_flowControl.getFlowControl())
{
- _flowControl.wait();
+ _logger.error("Message send failed due to timeout waiting on broker enforced flow control");
+ throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control");
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
index 9c791730ca..0e3333940a 100644
--- a/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
+++ b/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
@@ -39,6 +39,7 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance();
private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance();
private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance();
+ private static final ChannelFlowMethodHandler _channelFlowMethodHandler = ChannelFlowMethodHandler.getInstance();
private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance();
private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance();
@@ -159,7 +160,8 @@ public class ClientMethodDispatcherImpl implements MethodDispatcher
public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
{
- return false;
+ _channelFlowMethodHandler.methodReceived(_session, body, channelId);
+ return true;
}
public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException