summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2010-06-03 21:29:09 +0000
committerMartin Ritchie <ritchiem@apache.org>2010-06-03 21:29:09 +0000
commit385306d3e315dade5e6366f4f7c9a1fd1c41b482 (patch)
treeac6dfd2edc01d790826b6f977cc6a9b22e5ab118 /java/client/src
parent5cabf4fc55360f09e7ef6db312c7a9c938c8a4fa (diff)
downloadqpid-python-385306d3e315dade5e6366f4f7c9a1fd1c41b482.tar.gz
QPID-2633 : Move destinationConsumer count logic to before the consumer is registered.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@951165 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java23
1 files changed, 15 insertions, 8 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index b80178baf9..3390dfef8b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1835,12 +1835,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
throw ex;
}
- synchronized (destination)
- {
- _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger());
- _destinationConsumerCount.get(destination).incrementAndGet();
- }
-
return consumer;
}
}, _connection).execute();
@@ -1869,9 +1863,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
Destination dest = consumer.getDestination();
synchronized (dest)
{
- if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+ // Provide additional NPE check
+ // This would occur if the consumer was closed before it was
+ // fully opened.
+ if (_destinationConsumerCount.get(dest) != null)
{
- _destinationConsumerCount.remove(dest);
+ if (_destinationConsumerCount.get(dest).decrementAndGet() == 0)
+ {
+ _destinationConsumerCount.remove(dest);
+ }
}
}
@@ -2328,6 +2328,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
// we must register the consumer in the map before we actually start listening
_consumers.put(tagId, consumer);
+ synchronized (consumer.getDestination())
+ {
+ _destinationConsumerCount.putIfAbsent(consumer.getDestination(), new AtomicInteger());
+ _destinationConsumerCount.get(consumer.getDestination()).incrementAndGet();
+ }
+
+
try
{
sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId);