From 36bfc6709c20cbbf5e565444d22fc14cb40e9251 Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Tue, 11 Sep 2012 22:19:00 +0000 Subject: QPID-4296: Push down 0-8..0-9-1 flow control implementation to AMQSession_0_8 (refactoring) Move method impls. isFlowBlocked(), setFlowBlocked() and checkFlowBlocked() and their associated fields down to AMQSession_0_8. On 0-10, isFlowBlocked() was already overridden to delegate to the 0-10 transport layer. The operation setFlowBlocked() makes no sense to 0-10, so will be implemented to throw UnsupportedOperationException. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1383638 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 86 ++-------------------- .../org/apache/qpid/client/AMQSession_0_10.java | 7 ++ .../org/apache/qpid/client/AMQSession_0_8.java | 80 ++++++++++++++++++++ 3 files changed, 94 insertions(+), 79 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index f258404e2d..9af06eeaf4 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -20,11 +20,6 @@ */ package org.apache.qpid.client; -import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_FAILURE; -import static org.apache.qpid.configuration.ClientProperties.DEFAULT_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; -import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_FAILURE; -import static org.apache.qpid.configuration.ClientProperties.QPID_FLOW_CONTROL_WAIT_NOTIFY_PERIOD; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,19 +115,6 @@ public abstract class AMQSession= System.currentTimeMillis() ) - { - - _flowControl.wait(_flowControlWaitPeriod); - if (_logger.isInfoEnabled()) - { - _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control"); - } - } - if(!_flowControl.getFlowControl()) - { - _logger.error("Message send failed due to timeout waiting on broker enforced flow control"); - throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control"); - } - } + /** + * Tests whether flow to this session is blocked. + * + * @return true if flow is blocked or false otherwise. + */ + public abstract boolean isFlowBlocked(); - } + public abstract void setFlowControl(final boolean active); public interface Dispatchable { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index dcbdadf46d..e632827342 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -1410,6 +1410,13 @@ public class AMQSession_0_10 extends AMQSession= System.currentTimeMillis() ) + { + + _flowControl.wait(_flowControlWaitPeriod); + if (_logger.isInfoEnabled()) + { + _logger.info("Message send delayed by " + (System.currentTimeMillis() + _flowControlWaitFailure - expiryTime)/1000 + "s due to broker enforced flow control"); + } + } + if(!_flowControl.getFlowControl()) + { + _logger.error("Message send failed due to timeout waiting on broker enforced flow control"); + throw new JMSException("Unable to send message for " + _flowControlWaitFailure /1000 + " seconds due to broker enforced flow control"); + } + } + } + + + public abstract static class DestinationCache { private final Map> cache = new HashMap>(); @@ -775,6 +839,22 @@ public class AMQSession_0_8 extends AMQSession