diff options
Diffstat (limited to 'java')
4 files changed, 38 insertions, 16 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 92a8f88d28..e500dac9e3 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,6 +20,11 @@ */ package org.apache.qpid.client; +import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE; +import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; +import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE; +import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,18 +122,17 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Immediate message prefetch default. */ public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; - public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L; - /** * The period to wait while flow controlled before sending a log message confirming that the session is still * waiting on flow control being revoked */ - private final long _flowControlWaitPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L); + private final long _flowControlWaitPeriod = Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD, + DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD); /** * The period to wait while flow controlled before declaring a failure */ - private final long _flowControlWaitFailure = Long.getLong("qpid.flow_control_wait_failure", + private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE, DEFAULT_FLOW_CONTROL_WAIT_FAILURE); private final boolean _delareQueues = diff --git a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java index de3d8e67fd..0b4f0800d2 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java +++ b/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java @@ -33,9 +33,7 @@ public class FailoverPolicy { private static final Logger _logger = LoggerFactory.getLogger(FailoverPolicy.class); - private static final long MINUTE = 60000L; - - private final long DEFAULT_METHOD_TIMEOUT = Long.getLong("qpid.failover_method_timeout", 1 * MINUTE); + private final long DEFAULT_METHOD_TIMEOUT = Long.getLong("qpid.failover_method_timeout", 120000); private FailoverMethod[] _methods = new FailoverMethod[1]; diff --git a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index 3227bb6fc2..97fbd43ea0 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -168,4 +168,28 @@ public class ClientProperties public static final String SEND_BUFFER_SIZE_PROP_NAME = "qpid.send_buffer_size"; @Deprecated public static final String LEGACY_SEND_BUFFER_SIZE_PROP_NAME = "amqj.sendBufferSize"; + + /** + * System property to set the time (in millis) to wait before failing when sending and + * the client has been flow controlled by the broker. + */ + public static final String QPID_FLOW_CONTROL_WAIT_FAILURE = "qpid.flow_control_wait_failure"; + + /** + * Default time (in millis) to wait before failing when sending and the client has been + * flow controlled by the broker. + */ + public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 60000L; + + /** + * System property to set the time (in millis) between log notifications that a + * send is waiting because the client was flow controlled by the broker. + */ + public static final String QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD = "qpid.flow_control_wait_notify_period"; + + /** + * Default time (in millis) between log notifications that a send is + * waiting because the client was flow controlled by the broker. + */ + public static final long DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD = 5000L; } diff --git a/java/common/src/main/java/org/apache/qpid/transport/Session.java b/java/common/src/main/java/org/apache/qpid/transport/Session.java index 9a9e131adc..95c3e4669f 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Session.java @@ -94,8 +94,10 @@ public class Session extends SessionInvoker private final long timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT, Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT, ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT)); - private final long blockedSendTimeout = Long.getLong("qpid.flow_control_wait_failure", timeout); - private long blockedSendReportingPeriod = Long.getLong("qpid.flow_control_wait_notify_period",5000L); + private final long blockedSendTimeout = Long.getLong(ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE, + ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE); + private long blockedSendReportingPeriod = Long.getLong(ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD, + ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD); private boolean autoSync = false; @@ -215,12 +217,6 @@ public class Session extends SessionInvoker return this.state; } - public boolean isFlowControlled() - { - return flowControl; - } - - void setFlowControl(boolean value) { flowControl = value; @@ -1201,6 +1197,6 @@ public class Session extends SessionInvoker */ public boolean isFlowBlocked() { - return isFlowControlled() && credit.availablePermits() == 0; + return flowControl && credit.availablePermits() == 0; } } |
