summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-04-06 10:51:21 +0000
committerRobert Gemmell <robbie@apache.org>2012-04-06 10:51:21 +0000
commit6bf1950a62231f036a4567d17a8d8bcb34a2c410 (patch)
treeaf47509e6bb60132abfc95ea1d56ee83fbcae1f4 /java/client
parent6ca0f3ab3be2f37fc5819e33c21db644497a29ba (diff)
downloadqpid-python-6bf1950a62231f036a4567d17a8d8bcb34a2c410.tar.gz
QPID-3911: Fix deadlock on concurrent invocation of MessageConsumer#close() and Session#rollback() from consumer MessageListener
This patch contains the following changes: - Add synchronization on AMSession#_messageDeliveryLock into MessageConsumer#close() in order to block until message listener in progress has completed(as required in JMS javadoc for MessageConsumer#close()). - Change the session dispatcher to stop messages delivery into consumer local message queue if the consumer in the process of closing. This eliminates the need to stop the dispatcher on rejecting pending messages for closing consumer. - Remove the synchronization on the dispatcher lock from AMQSession.Dispatcher#rejectPending and code to stop the dispatcher, as we are synchronizing on the deliveryLock now and incoming messages are not dispatched into closing consumers anymore. - Add a system test to reproduce the deadlock and verify its resolution. Applied patch from Oleksandr Rudyy <orudyy@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1310275 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java27
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java5
2 files changed, 11 insertions, 21 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index ce624cb91b..55d3ccb6e7 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -3212,28 +3212,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public void rejectPending(C consumer)
{
- synchronized (_lock)
- {
- boolean stopped = connectionStopped();
+ // Reject messages on pre-receive queue
+ consumer.rollbackPendingMessages();
- if (!stopped)
- {
- setConnectionStopped(true);
- }
+ // Reject messages on pre-dispatch queue
+ rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false);
- // Reject messages on pre-receive queue
- consumer.rollbackPendingMessages();
+ // closeConsumer
+ consumer.markClosed();
- // Reject messages on pre-dispatch queue
- rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false);
- //Let the dispatcher deal with this when it gets to them.
-
- // closeConsumer
- consumer.markClosed();
-
- setConnectionStopped(stopped);
-
- }
}
public void rollback()
@@ -3425,7 +3412,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
final C consumer = _consumers.get(message.getConsumerTag());
- if ((consumer == null) || consumer.isClosed())
+ if ((consumer == null) || consumer.isClosed() || consumer.isClosing())
{
if (_dispatcherLogger.isInfoEnabled())
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 0d717a3216..0f8b5717d6 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -593,7 +593,10 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
// no point otherwise as the connection will be gone
if (!_session.isClosed() || _session.isClosing())
{
- sendCancel();
+ synchronized(_session.getMessageDeliveryLock())
+ {
+ sendCancel();
+ }
cleanupQueue();
}
}