diff options
Diffstat (limited to 'java/client')
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQConnection.java | 80 | ||||
-rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java | 5 |
2 files changed, 37 insertions, 48 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 6b3b4601d9..d9298abd0f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -81,7 +81,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class); private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong(); - private static final long DEFAULT_CLOSE_TIMEOUT = 2000l; + private static final long DEFAULT_CLOSE_TIMEOUT = Long.getLong(ClientProperties.QPID_CLOSE_TIMEOUT, + ClientProperties.DEFAULT_CLOSE_TIMEOUT); private final long _connectionNumber; @@ -880,44 +881,19 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { try { - long startCloseTime = System.currentTimeMillis(); - - closeAllSessions(null, timeout, startCloseTime); - - //This MUST occur after we have successfully closed all Channels/Sessions - _taskPool.shutdown(); - - if (!_taskPool.isTerminated()) + try { - try - { - // adjust timeout - long taskPoolTimeout = adjustTimeout(timeout, startCloseTime); - - _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - _logger.info("Interrupted while shutting down connection thread pool."); - } + closeAllSessions(null, timeout); } - - // adjust timeout - timeout = adjustTimeout(timeout, startCloseTime); - //If the taskpool hasn't shutdown by now then give it shutdownNow. - // This will interupt any running tasks. - if (!_taskPool.isTerminated()) + finally { - List<Runnable> tasks = _taskPool.shutdownNow(); - for (Runnable r : tasks) - { - _logger.warn("Connection close forced taskpool to prevent execution:" + r); - } + //This MUST occur after we have successfully closed all Channels/Sessions + shutdownTaskPool(timeout); } } catch (JMSException e) { - _logger.error("error:", e); + _logger.error("Error closing connection", e); JMSException jmse = new JMSException("Error closing connection: " + e); jmse.setLinkedException(e); jmse.initCause(e); @@ -939,16 +915,32 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect } } - private long adjustTimeout(long timeout, long startTime) + private void shutdownTaskPool(final long timeout) { - long now = System.currentTimeMillis(); - timeout -= now - startTime; - if (timeout < 0) + _taskPool.shutdown(); + + if (!_taskPool.isTerminated()) { - timeout = 0; + try + { + _taskPool.awaitTermination(timeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + _logger.info("Interrupted while shutting down connection thread pool."); + } } - return timeout; + //If the taskpool hasn't shutdown by now then give it shutdownNow. + // This will interrupt any running tasks. + if (!_taskPool.isTerminated()) + { + List<Runnable> tasks = _taskPool.shutdownNow(); + for (Runnable r : tasks) + { + _logger.warn("Connection close forced taskpool to prevent execution:" + r); + } + } } /** @@ -976,7 +968,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect * @param cause if not null, the error that is causing this shutdown <p/> The caller must hold the failover mutex * before calling this method. */ - private void closeAllSessions(Throwable cause, long timeout, long starttime) throws JMSException + private void closeAllSessions(Throwable cause, long timeout) throws JMSException { final LinkedList sessionCopy = new LinkedList(_sessions.values()); final Iterator it = sessionCopy.iterator(); @@ -992,11 +984,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { try { - if (starttime != -1) - { - timeout = adjustTimeout(timeout, starttime); - } - session.close(timeout); } catch (JMSException e) @@ -1042,7 +1029,6 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { - // TODO Auto-generated method stub checkNotClosed(); throw new JmsNotImplementedException(); @@ -1322,7 +1308,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect { try { - closeAllSessions(cause, -1, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. + closeAllSessions(cause, -1); // FIXME: when doing this end up with RejectedExecutionException from executor. } catch (JMSException e) { @@ -1444,7 +1430,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public AMQShortString getTemporaryQueueExchangeName() { - return _temporaryQueueExchangeName; // To change body of created methods use File | Settings | File Templates. + return _temporaryQueueExchangeName; } public void setTemporaryTopicExchangeName(AMQShortString temporaryTopicExchangeName) diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 8a15fffe84..d86a2739f2 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -32,6 +32,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Destination; @@ -96,6 +97,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe /** Flow control */ private FlowControlIndicator _flowControl = new FlowControlIndicator(); + private final AtomicBoolean _creditChanged = new AtomicBoolean(); /** * Creates a new session on a connection. @@ -847,6 +849,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class); + _creditChanged.set(true); return true; } else @@ -863,7 +866,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe int acknowledgeMode = getAcknowledgeMode(); boolean manageCredit = acknowledgeMode == javax.jms.Session.CLIENT_ACKNOWLEDGE || acknowledgeMode == javax.jms.Session.SESSION_TRANSACTED; - if(manageCredit) + if(manageCredit && _creditChanged.compareAndSet(true,false)) { new FailoverNoopSupport<>( new FailoverProtectedOperation<Void, AMQException>() |