diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2010-06-03 21:29:09 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2010-06-03 21:29:09 +0000 |
| commit | 385306d3e315dade5e6366f4f7c9a1fd1c41b482 (patch) | |
| tree | ac6dfd2edc01d790826b6f977cc6a9b22e5ab118 /java/client/src | |
| parent | 5cabf4fc55360f09e7ef6db312c7a9c938c8a4fa (diff) | |
| download | qpid-python-385306d3e315dade5e6366f4f7c9a1fd1c41b482.tar.gz | |
QPID-2633 : Move destinationConsumer count logic to before the consumer is registered.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@951165 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/AMQSession.java | 23 |
1 files changed, 15 insertions, 8 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index b80178baf9..3390dfef8b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1835,12 +1835,6 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic throw ex; } - synchronized (destination) - { - _destinationConsumerCount.putIfAbsent(destination, new AtomicInteger()); - _destinationConsumerCount.get(destination).incrementAndGet(); - } - return consumer; } }, _connection).execute(); @@ -1869,9 +1863,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic Destination dest = consumer.getDestination(); synchronized (dest) { - if (_destinationConsumerCount.get(dest).decrementAndGet() == 0) + // Provide additional NPE check + // This would occur if the consumer was closed before it was + // fully opened. + if (_destinationConsumerCount.get(dest) != null) { - _destinationConsumerCount.remove(dest); + if (_destinationConsumerCount.get(dest).decrementAndGet() == 0) + { + _destinationConsumerCount.remove(dest); + } } } @@ -2328,6 +2328,13 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic // we must register the consumer in the map before we actually start listening _consumers.put(tagId, consumer); + synchronized (consumer.getDestination()) + { + _destinationConsumerCount.putIfAbsent(consumer.getDestination(), new AtomicInteger()); + _destinationConsumerCount.get(consumer.getDestination()).incrementAndGet(); + } + + try { sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId); |
