diff options
| author | Keith Wall <kwall@apache.org> | 2012-09-11 22:19:00 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-09-11 22:19:00 +0000 |
| commit | 36bfc6709c20cbbf5e565444d22fc14cb40e9251 (patch) | |
| tree | 4d4a2f5b07acff0f5d14afa9cee10f5c59686e6e /qpid/java | |
| parent | e4c0da5e67b49fbb9ecd7bd9f6938e5ec7ca4a69 (diff) | |
| download | qpid-python-36bfc6709c20cbbf5e565444d22fc14cb40e9251.tar.gz | |
QPID-4296: Push down 0-8..0-9-1 flow control implementation to AMQSession_0_8 (refactoring)
Move method impls. isFlowBlocked(), setFlowBlocked() and checkFlowBlocked() and their associated fields
down to AMQSession_0_8.
On 0-10, isFlowBlocked() was already overridden to delegate to the 0-10 transport layer.
The operation setFlowBlocked() makes no sense to 0-10, so will be implemented to throw
UnsupportedOperationException.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1383638 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
3 files changed, 94 insertions, 79 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index f258404e2d..9af06eeaf4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,11 +20,6 @@ */ package org.apache.qpid.client; -import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE; -import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; -import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE; -import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,19 +115,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Immediate message prefetch default. */ public static final String IMMEDIATE_PREFETCH_DEFAULT = "false"; - /** - * The period to wait while flow controlled before sending a log message confirming that the session is still - * waiting on flow control being revoked - */ - private final long _flowControlWaitPeriod = Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD, - DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD); - - /** - * The period to wait while flow controlled before declaring a failure - */ - private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE, - DEFAULT_FLOW_CONTROL_WAIT_FAILURE); - private final boolean _delareQueues = Boolean.parseBoolean(System.getProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "true")); @@ -263,11 +245,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic /** Has failover occured on this session with outstanding actions to commit? */ private boolean _failedOverDirty; - /** Flow control */ - private FlowControlIndicator _flowControl = new FlowControlIndicator(); - - - /** Holds the highest received delivery tag. */ protected AtomicLong getHighestDeliveryTag() { @@ -406,22 +383,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } - private static final class FlowControlIndicator - { - private volatile boolean _flowControl = true; - - public synchronized void setFlowControl(boolean flowControl) - { - _flowControl = flowControl; - notify(); - } - - public boolean getFlowControl() - { - return _flowControl; - } - } - /** * Creates a new session on a connection. * @@ -3087,47 +3048,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic _ticket = ticket; } - public boolean isFlowBlocked() - { - synchronized (_flowControl) - { - return !_flowControl.getFlowControl(); - } - } - - public void setFlowControl(final boolean active) - { - _flowControl.setFlowControl(active); - if (_logger.isInfoEnabled()) - { - _logger.info("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced")); - } - } - - public void checkFlowControl() throws InterruptedException, JMSException - { - long expiryTime = 0L; - synchronized (_flowControl) - { - while (!_flowControl.getFlowControl() && - (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + _flowControlWaitFailure) - : expiryTime) >= System.currentTimeMillis() ) - { - - _flowControl.wait(_flowControlWaitPeriod); - if (_logger.isInfoEnabled()) - { - _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control"); - } - } - if(!_flowControl.getFlowControl()) - { - _logger.error("Message send failed due to timeout waiting on broker enforced flow control"); - throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control"); - } - } + /** + * Tests whether flow to this session is blocked. + * + * @return true if flow is blocked or false otherwise. + */ + public abstract boolean isFlowBlocked(); - } + public abstract void setFlowControl(final boolean active); public interface Dispatchable { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index dcbdadf46d..e632827342 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -1410,6 +1410,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic return _qpidSession.isFlowBlocked(); } + @Override + public void setFlowControl(boolean active) + { + // Supported by 0-8..0-9-1 only + throw new UnsupportedOperationException("Operation not supported by this protocol"); + } + private void cancelTimerTask() { if (flushTask != null) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 9cd5eb1491..ade7ab8033 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -21,6 +21,11 @@ package org.apache.qpid.client; +import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE; +import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; +import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE; +import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +65,22 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class); /** + * The period to wait while flow controlled before sending a log message confirming that the session is still + * waiting on flow control being revoked + */ + private final long _flowControlWaitPeriod = Long.getLong(QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD, + DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD); + + /** + * The period to wait while flow controlled before declaring a failure + */ + private final long _flowControlWaitFailure = Long.getLong(QPID_FLOW_CONTROL_WAIT_FAILURE, + DEFAULT_FLOW_CONTROL_WAIT_FAILURE); + + /** Flow control */ + private FlowControlIndicator _flowControl = new FlowControlIndicator(); + + /** * Creates a new session on a connection. * * @param con The connection on which to create the session. @@ -728,6 +749,49 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } } + public boolean isFlowBlocked() + { + synchronized (_flowControl) + { + return !_flowControl.getFlowControl(); + } + } + + public void setFlowControl(final boolean active) + { + _flowControl.setFlowControl(active); + if (_logger.isInfoEnabled()) + { + _logger.info("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced")); + } + } + + void checkFlowControl() throws InterruptedException, JMSException + { + long expiryTime = 0L; + synchronized (_flowControl) + { + while (!_flowControl.getFlowControl() && + (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + _flowControlWaitFailure) + : expiryTime) >= System.currentTimeMillis() ) + { + + _flowControl.wait(_flowControlWaitPeriod); + if (_logger.isInfoEnabled()) + { + _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control"); + } + } + if(!_flowControl.getFlowControl()) + { + _logger.error("Message send failed due to timeout waiting on broker enforced flow control"); + throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control"); + } + } + } + + + public abstract static class DestinationCache<T extends AMQDestination> { private final Map<AMQShortString, Map<AMQShortString, T>> cache = new HashMap<AMQShortString, Map<AMQShortString, T>>(); @@ -775,6 +839,22 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe } } + private static final class FlowControlIndicator + { + private volatile boolean _flowControl = true; + + public synchronized void setFlowControl(boolean flowControl) + { + _flowControl = flowControl; + notify(); + } + + public boolean getFlowControl() + { + return _flowControl; + } + } + private final TopicDestinationCache _topicDestinationCache = new TopicDestinationCache(); private final QueueDestinationCache _queueDestinationCache = new QueueDestinationCache(); |
