diff options
| author | Rupert Smith <rupertlssmith@apache.org> | 2007-09-24 10:22:32 +0000 |
|---|---|---|
| committer | Rupert Smith <rupertlssmith@apache.org> | 2007-09-24 10:22:32 +0000 |
| commit | 2af12a2b280c6ee0920f361ac39adb0855a5a9cb (patch) | |
| tree | 24e7ec8996363d1b98b75fa49573fe8bb6aded6f /java/client | |
| parent | d8e60633e847a3f7d6f988163dd3163233aaee78 (diff) | |
| download | qpid-python-2af12a2b280c6ee0920f361ac39adb0855a5a9cb.tar.gz | |
Merged revisions 575663-575687,575689-576860,576862-577192,577194-577315,577317-577659,577661-578047,578049-578060,578062-578604 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r575663 | rgodfrey | 2007-09-14 13:43:13 +0100 (Fri, 14 Sep 2007) | 1 line
QPID-600 : Deadlock on connection.close
........
r577931 | rgreig | 2007-09-20 22:26:37 +0100 (Thu, 20 Sep 2007) | 1 line
Adding timeouts to two wait() calls to prevent hanging
........
r578258 | rgreig | 2007-09-21 21:31:18 +0100 (Fri, 21 Sep 2007) | 1 line
QPID-607: dispatcher threads now poll so that the can die when the connection is closed.
........
r578475 | rgreig | 2007-09-22 20:01:59 +0100 (Sat, 22 Sep 2007) | 1 line
QPID-608 Fix the test by adding in creation of the VM broker
........
r578509 | rgreig | 2007-09-22 23:05:30 +0100 (Sat, 22 Sep 2007) | 1 line
QPID-609 : dispatcher thread was being restarted by the code that closed the consumer due to the receipt of a basic.cancel frame. Move the dispatcher shutdown to the end of the consumer close process. Also rename the dispatcher _closed field since it clashes with a field in the container class.
........
r578604 | rgreig | 2007-09-23 22:29:33 +0100 (Sun, 23 Sep 2007) | 4 lines
QPID-589: avoid the deadlock between the session close and the BasicCancelOkHandler by
not sending a BasicCancel when the session is being closed.
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@578745 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
4 files changed, 92 insertions, 64 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 9acc6ada0e..07b8d8d9b7 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -70,11 +70,7 @@ import java.io.IOException; import java.net.ConnectException; import java.nio.channels.UnresolvedAddressException; import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.Map; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -92,6 +88,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect */ private final Object _failoverMutex = new Object(); + private final Object _sessionCreationLock = new Object(); + /** * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session * and we must prevent the client from opening too many. Zero means unlimited. @@ -509,6 +507,8 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow) throws JMSException { + synchronized(_sessionCreationLock) + { checkNotClosed(); if (channelLimitReached()) @@ -572,6 +572,7 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return session; } }, this).execute(); + } } private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) @@ -760,44 +761,63 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect public void close(long timeout) throws JMSException { - synchronized (getFailoverMutex()) + close(new ArrayList<AMQSession>(_sessions.values()),timeout); + } + + public void close(List<AMQSession> sessions, long timeout) throws JMSException + { + synchronized(_sessionCreationLock) { - if (!_closed.getAndSet(true)) + if(!sessions.isEmpty()) { - try + AMQSession session = sessions.remove(0); + synchronized(session.getMessageDeliveryLock()) + { + close(sessions, timeout); + } + } + else + { + synchronized (getFailoverMutex()) + { + if (!_closed.getAndSet(true)) { - long startCloseTime = System.currentTimeMillis(); + try + { + long startCloseTime = System.currentTimeMillis(); - _taskPool.shutdown(); - closeAllSessions(null, timeout, startCloseTime); + _taskPool.shutdown(); + closeAllSessions(null, timeout, startCloseTime); - if (!_taskPool.isTerminated()) - { - try + if (!_taskPool.isTerminated()) { - // adjust timeout - long taskPoolTimeout = adjustTimeout(timeout, startCloseTime); + try + { + // adjust timeout + long taskPoolTimeout = adjustTimeout(timeout, startCloseTime); - _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) - { - _logger.info("Interrupted while shutting down connection thread pool."); + _taskPool.awaitTermination(taskPoolTimeout, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + _logger.info("Interrupted while shutting down connection thread pool."); + } } - } - // adjust timeout - timeout = adjustTimeout(timeout, startCloseTime); + // adjust timeout + timeout = adjustTimeout(timeout, startCloseTime); - _protocolHandler.closeConnection(timeout); + _protocolHandler.closeConnection(timeout); + } + catch (AMQException e) + { + JMSException jmse = new JMSException("Error closing connection: " + e); + jmse.setLinkedException(e); + throw jmse; + } } - catch (AMQException e) - { - JMSException jmse = new JMSException("Error closing connection: " + e); - jmse.setLinkedException(e); - throw jmse; - } + } } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 74b4cc0618..f873801dc9 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -106,6 +106,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -519,7 +520,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi synchronized (_messageDeliveryLock) { - // We must close down all producers and consumers in an orderly fashion. This is the only method // that can be called from a different thread of control from the one controlling the session. synchronized (_connection.getFailoverMutex()) @@ -666,11 +666,10 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi else { _logger.info("Dispatcher is null so created stopped dispatcher"); - startDistpatcherIfNecessary(true); } - _dispatcher.rejectPending(consumer); + _dispatcher.rejectPending(consumer); } else { @@ -1954,11 +1953,6 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi */ private void closeConsumers(Throwable error) throws JMSException { - if (_dispatcher != null) - { - _dispatcher.close(); - _dispatcher = null; - } // we need to clone the list of consumers since the close() method updates the _consumers collection // which would result in a concurrent modification exception final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values()); @@ -1973,10 +1967,15 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } else { - con.close(); + con.close(false); } } // at this point the _consumers map will be empty + if (_dispatcher != null) + { + _dispatcher.close(); + _dispatcher = null; + } } /** @@ -2557,12 +2556,17 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } + Object getMessageDeliveryLock() + { + return _messageDeliveryLock; + } + /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */ private class Dispatcher extends Thread { /** Track the 'stopped' state of the dispatcher, a session starts in the stopped state. */ - private final AtomicBoolean _closed = new AtomicBoolean(false); + private final AtomicBoolean _dispatcherClosed = new AtomicBoolean(false); private final Object _lock = new Object(); private final AtomicLong _rollbackMark = new AtomicLong(-1); @@ -2578,7 +2582,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi public void close() { - _closed.set(true); + _dispatcherClosed.set(true); interrupt(); // fixme awaitTermination @@ -2673,30 +2677,33 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi try { - while (!_closed.get() && ((message = (UnprocessedMessage) _queue.take()) != null)) + while (!_dispatcherClosed.get()) { - synchronized (_lock) + message = (UnprocessedMessage) _queue.poll(1000, TimeUnit.MILLISECONDS); + if (message != null) { - - while (connectionStopped()) + synchronized (_lock) { - _lock.wait(2000); - } - if (message.getDeliverBody().deliveryTag <= _rollbackMark.get()) - { - rejectMessage(message, true); - } - else - { - synchronized (_messageDeliveryLock) + while (connectionStopped()) { - dispatchMessage(message); + _lock.wait(2000); } - } - } + if (message.getDeliverBody().deliveryTag <= _rollbackMark.get()) + { + rejectMessage(message, true); + } + else + { + synchronized (_messageDeliveryLock) + { + dispatchMessage(message); + } + } + } + } } } catch (InterruptedException e) @@ -2760,7 +2767,7 @@ public class AMQSession extends Closeable implements Session, QueueSession, Topi } } // Don't reject if we're already closing - if (!_closed.get()) + if (!_dispatcherClosed.get()) { rejectMessage(message, true); } diff --git a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java index 0fc39a9318..579b0d9e90 100644 --- a/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java +++ b/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java @@ -23,6 +23,7 @@ package org.apache.qpid.client.util; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; /** * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow @@ -69,10 +70,10 @@ public class FlowControllingBlockingQueue _listener = listener; } - public Object take() throws InterruptedException + public Object poll(long time, TimeUnit unit) throws InterruptedException { - Object o = _queue.take(); - if (_listener != null) + Object o = _queue.poll(time, unit); + if (o != null && _listener != null) { synchronized (_listener) { diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java index 588c82221e..56394fee27 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java @@ -47,12 +47,12 @@ public class ConnectionTest extends TestCase protected void setUp() throws Exception { super.setUp(); -// TransportConnection.createVMBroker(1); + TransportConnection.createVMBroker(1); } protected void tearDown() throws Exception { -// TransportConnection.killVMBroker(1); + TransportConnection.killVMBroker(1); } public void testSimpleConnection() |
